merged hyracks_asterix_stabilization r1683:1702

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1704 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index c99d621..165fccd 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -89,8 +89,4 @@
     public IPhysicalPropertiesVector getDeliveredPhysicalProperties();
 
     public void computeDeliveredPhysicalProperties(IOptimizationContext context) throws AlgebricksException;
-
-    public void setHostQueryContext(Object context);
-
-    public Object getHostQueryContext();
 }
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
index c406f2b..de9cf29 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
@@ -30,25 +30,24 @@
     public static final String ALGEBRICKS_NS = "algebricks";
 
     // comparisons
-    public final static FunctionIdentifier EQ = new FunctionIdentifier(ALGEBRICKS_NS, "eq", 2, true);
-    public final static FunctionIdentifier LE = new FunctionIdentifier(ALGEBRICKS_NS, "le", 2, true);
-    public final static FunctionIdentifier GE = new FunctionIdentifier(ALGEBRICKS_NS, "ge", 2, true);
-    public final static FunctionIdentifier LT = new FunctionIdentifier(ALGEBRICKS_NS, "lt", 2, true);
-    public final static FunctionIdentifier GT = new FunctionIdentifier(ALGEBRICKS_NS, "gt", 2, true);
-    public final static FunctionIdentifier NEQ = new FunctionIdentifier(ALGEBRICKS_NS, "neq", 2, true);
+    public final static FunctionIdentifier EQ = new FunctionIdentifier(ALGEBRICKS_NS, "eq", 2);
+    public final static FunctionIdentifier LE = new FunctionIdentifier(ALGEBRICKS_NS, "le", 2);
+    public final static FunctionIdentifier GE = new FunctionIdentifier(ALGEBRICKS_NS, "ge", 2);
+    public final static FunctionIdentifier LT = new FunctionIdentifier(ALGEBRICKS_NS, "lt", 2);
+    public final static FunctionIdentifier GT = new FunctionIdentifier(ALGEBRICKS_NS, "gt", 2);
+    public final static FunctionIdentifier NEQ = new FunctionIdentifier(ALGEBRICKS_NS, "neq", 2);
 
     // booleans
-    public final static FunctionIdentifier NOT = new FunctionIdentifier(ALGEBRICKS_NS, "not", 1, true);
+    public final static FunctionIdentifier NOT = new FunctionIdentifier(ALGEBRICKS_NS, "not", 1);
     public final static FunctionIdentifier AND = new FunctionIdentifier(ALGEBRICKS_NS, "and",
-            FunctionIdentifier.VARARGS, true);
-    public final static FunctionIdentifier OR = new FunctionIdentifier(ALGEBRICKS_NS, "or", FunctionIdentifier.VARARGS,
-            true);
+            FunctionIdentifier.VARARGS);
+    public final static FunctionIdentifier OR = new FunctionIdentifier(ALGEBRICKS_NS, "or", FunctionIdentifier.VARARGS);
 
     // numerics
-    public final static FunctionIdentifier NUMERIC_ADD = new FunctionIdentifier(ALGEBRICKS_NS, "numeric-add", 2, true);
+    public final static FunctionIdentifier NUMERIC_ADD = new FunctionIdentifier(ALGEBRICKS_NS, "numeric-add", 2);
 
     // nulls
-    public final static FunctionIdentifier IS_NULL = new FunctionIdentifier(ALGEBRICKS_NS, "is-null", 1, true);
+    public final static FunctionIdentifier IS_NULL = new FunctionIdentifier(ALGEBRICKS_NS, "is-null", 1);
 
     private static final Map<FunctionIdentifier, ComparisonKind> comparisonFunctions = new HashMap<FunctionIdentifier, ComparisonKind>();
     static {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
index 252f23c..548f646 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
@@ -14,36 +14,31 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
 
-public final class FunctionIdentifier {
-    final private String namespace;
-    final private String name;
-    final private boolean isBuiltin;
-    final private int arity;
+import java.io.Serializable;
+
+public final class FunctionIdentifier implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String namespace;
+    private final String name;
+    private final int arity;
 
     public final static int VARARGS = -1;
 
-    public FunctionIdentifier(String namespace, String name, boolean isBuiltin) {
-        this.namespace = namespace;
-        this.name = name;
-        this.arity = VARARGS;
-        this.isBuiltin = isBuiltin;
+    public FunctionIdentifier(String namespace, String name) {
+        this(namespace, name, VARARGS);
     }
 
-    public FunctionIdentifier(String namespace, String name, int arity, boolean isBuiltin) {
+    public FunctionIdentifier(String namespace, String name, int arity) {
         this.namespace = namespace;
         this.name = name;
         this.arity = arity;
-        this.isBuiltin = isBuiltin;
     }
 
     public String getName() {
         return name;
     }
 
-    public boolean isBuiltin() {
-        return isBuiltin;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (super.equals(o)) {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 4acd53b..dc0edfe 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -59,7 +59,6 @@
     protected IPhysicalOperator physicalOperator;
     private final Map<String, Object> annotations = new HashMap<String, Object>();
     private boolean bJobGenEnabled = true;
-    private Object hostQueryContext;
 
     final protected List<Mutable<ILogicalOperator>> inputs;
     // protected List<LogicalOperatorReference> outputs;
@@ -168,14 +167,6 @@
         return bJobGenEnabled;
     }
 
-    public void setHostQueryContext(Object context) {
-        this.hostQueryContext = context;
-    }
-
-    public Object getHostQueryContext() {
-        return hostQueryContext;
-    }
-
     @Override
     public IVariableTypeEnvironment computeInputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index 9f75bbc..f314e09 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -18,6 +18,8 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
@@ -83,33 +85,34 @@
     private class PartitionWriter implements Runnable, IInputChannelMonitor {
         private PartitionChannel pc;
 
-        private int nAvailableFrames;
+        private final AtomicInteger nAvailableFrames;
 
-        private boolean eos;
+        private final AtomicBoolean eos;
 
-        private boolean failed;
+        private final AtomicBoolean failed;
 
         public PartitionWriter(PartitionChannel pc) {
             this.pc = pc;
-            nAvailableFrames = 0;
-            eos = false;
+            nAvailableFrames = new AtomicInteger(0);
+            eos = new AtomicBoolean(false);
+            failed = new AtomicBoolean(false);
         }
 
         @Override
         public synchronized void notifyFailure(IInputChannel channel) {
-            failed = true;
+            failed.set(true);
             notifyAll();
         }
 
         @Override
         public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
-            nAvailableFrames += nFrames;
+            nAvailableFrames.addAndGet(nFrames);
             notifyAll();
         }
 
         @Override
         public synchronized void notifyEndOfStream(IInputChannel channel) {
-            eos = true;
+            eos.set(true);
             notifyAll();
         }
 
@@ -123,26 +126,22 @@
                 channel.open();
                 mpw.open();
                 while (true) {
-                    int nAvailableFrames;
-                    boolean eos;
-                    boolean failed;
-                    synchronized (this) {
-                        nAvailableFrames = this.nAvailableFrames;
-                        eos = this.eos;
-                        failed = this.failed;
-                    }
-                    if (nAvailableFrames > 0) {
+                    if (nAvailableFrames.get() > 0) {
                         ByteBuffer buffer = channel.getNextBuffer();
-                        --nAvailableFrames;
+                        nAvailableFrames.decrementAndGet();
                         mpw.nextFrame(buffer);
                         channel.recycleBuffer(buffer);
-                    } else if (eos) {
+                    } else if (eos.get()) {
                         break;
-                    } else if (failed) {
+                    } else if (failed.get()) {
                         throw new HyracksDataException("Failure occurred on input");
                     } else {
                         try {
-                            wait();
+                            synchronized (this) {
+                                if (nAvailableFrames.get() <= 0 && !eos.get() && !failed.get()) {
+                                    wait();
+                                }
+                            }
                         } catch (InterruptedException e) {
                             throw new HyracksDataException(e);
                         }