merged hyracks_asterix_stabilization r1606:1627

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1629 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 01fa9da..5234d2c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -44,5 +44,6 @@
     WRITE_RESULT,
     INSERT_DELETE,
     INDEX_INSERT_DELETE,
-    UPDATE
+    UPDATE,
+    EXTENSION_OPERATOR
 }
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 48f320b..95b320f 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -5,6 +5,7 @@
     ASSIGN,
     BROADCAST_EXCHANGE,
     BTREE_SEARCH,
+    STATS,
     DATASOURCE_SCAN,
     EMPTY_TUPLE_SOURCE,
     EXTERNAL_GROUP_BY,
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
index 4268a42..96afc8e 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
 
+
 public interface IFunctionInfo {
     FunctionIdentifier getFunctionIdentifier();
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
new file mode 100644
index 0000000..10ecb1e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+
+/**
+ * @author rico
+ */
+public abstract class AbstractExtensibleLogicalOperator implements IOperatorExtension {
+
+    private AbstractLogicalOperator.ExecutionMode mode = AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
+    protected List<LogicalVariable> schema;
+    protected IPhysicalOperator physicalOperator;
+
+    @Override
+    public ExecutionMode getExecutionMode() {
+        return mode;
+    }
+
+    @Override
+    public void setExecutionMode(ExecutionMode mode) {
+        this.mode = mode;
+    }
+
+    @Override
+    public void setSchema(List<LogicalVariable> schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public IPhysicalOperator getPhysicalOperator() {
+        return physicalOperator;
+    }
+
+    @Override
+    public void setPhysicalOperator(IPhysicalOperator physicalOperator) {
+        this.physicalOperator = physicalOperator;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
new file mode 100644
index 0000000..101b6f5
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * @author rico
+ */
+public class ExtensionOperator extends AbstractLogicalOperator {
+
+    private IOperatorExtension delegate;
+
+    public ExtensionOperator(IOperatorExtension delegate) {
+        super();
+        if (delegate == null) {
+            throw new IllegalArgumentException("delegate cannot be null!");
+        }
+        this.delegate = delegate;
+        setExecutionMode(delegate.getExecutionMode());
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+        delegate.setSchema(schema);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitExtensionOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return this.delegate.isMap();
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return this.createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.EXTENSION_OPERATOR;
+    }
+
+    public IOperatorExtension getNewInstanceOfDelegateOperator() {
+        return delegate.newInstance();
+    }
+
+    @Override
+    public List<LogicalVariable> getSchema() {
+        return this.schema;
+    }
+
+    @Override
+    public ExecutionMode getExecutionMode() {
+        return delegate.getExecutionMode();
+    }
+
+    @Override
+    public void setExecutionMode(ExecutionMode mode) {
+        delegate.setExecutionMode(mode);
+    }
+
+    @Override
+    public IPhysicalOperator getPhysicalOperator() {
+        return delegate.getPhysicalOperator();
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeInputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return this.createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
new file mode 100644
index 0000000..98c3301
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * @author rico
+ */
+public interface IOperatorExtension {
+
+    void setExecutionMode(ExecutionMode mode);
+
+    boolean isMap();
+
+    public IOperatorExtension newInstance();
+
+    boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException;
+
+    void setSchema(List<LogicalVariable> schema);
+
+    IPhysicalOperator getPhysicalOperator();
+
+    void setPhysicalOperator(IPhysicalOperator physicalOperator);
+
+    ExecutionMode getExecutionMode();
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 3ff1534..0539cbe 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -64,6 +64,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -637,4 +638,10 @@
         return null;
     }
 
+    @Override
+    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index fbae89e..d845bf7 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -41,6 +41,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -110,6 +111,14 @@
     }
 
     @Override
+    public Boolean visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
+        ExtensionOperator aop = (ExtensionOperator) copyAndSubstituteVar(op, arg);
+        if (aop.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
     public Boolean visitGroupByOperator(GroupByOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         // require the same physical operator, otherwise delivers different data
@@ -678,8 +687,7 @@
                 throws AlgebricksException {
             ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newExpressions, Arrays.asList(op.getExpressions()));
-            return new PartitioningSplitOperator(newExpressions.toArray(new Mutable[0]),
-                    op.hasDefault());
+            return new PartitioningSplitOperator(newExpressions.toArray(new Mutable[0]), op.hasDefault());
         }
 
         @Override
@@ -809,6 +817,11 @@
                         deepCopyExpressionRef(pair.second)));
             return newOrdersAndExprs;
         }
+
+        @Override
+        public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+            return new ExtensionOperator(op.getNewInstanceOfDelegateOperator());
+        }
     }
 
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 3c81250..562bb4c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -54,6 +54,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -414,4 +415,10 @@
             return false;
     }
 
+    @Override
+    public Void visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 2f0cf07..9b2f5a0 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -318,4 +319,10 @@
         }
     }
 
+    @Override
+    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext arg) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 0e2b4a2..994c6cb 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -49,6 +49,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -245,4 +246,9 @@
     public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {

         return null;

     }

+

+    @Override

+    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {

+        return null;

+    }

 }

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index a917756..9295179 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -48,6 +48,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -267,4 +268,10 @@
         return null;

     }

 

+    @Override

+    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {

+        standardLayout(op);

+        return null;

+    }

+

 }

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 717a2ab..11e56ca 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -51,6 +51,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -409,4 +410,10 @@
         IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);

         env.substituteProducedVariable(arg.first, arg.second);

     }

+

+    @Override

+    public Void visitExtensionOperator(ExtensionOperator op, Pair<LogicalVariable, LogicalVariable> arg)

+            throws AlgebricksException {

+        return null;

+    }

 }

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 124a8e4..56487cc 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -49,6 +49,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -291,4 +292,9 @@
         return null;

     }

 

+    @Override

+    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {

+        return null;

+    }

+

 }

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 9ab1a5e..a94c78e 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -48,6 +48,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -223,8 +224,7 @@
     @Override
     public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append(
-                "unnest-map " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
+        addIndent(buffer, indent).append("unnest-map " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
         return buffer.toString();
     }
 
@@ -346,4 +346,11 @@
         return buffer.toString();
     }
 
+    @Override
+    public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("statistics collection");
+        return buffer.toString();
+    }
+
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 93633c8..6b5949e 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -68,6 +69,8 @@
 
     public R visitSelectOperator(SelectOperator op, T arg) throws AlgebricksException;
 
+    public R visitExtensionOperator(ExtensionOperator op, T arg) throws AlgebricksException;
+
     public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException;
 
     public R visitPartitioningSplitOperator(PartitioningSplitOperator op, T arg) throws AlgebricksException;
diff --git a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index f671224..7a4d71c4 100644
--- a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -188,4 +188,4 @@
     public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
         return FN_MAP.get(fid);
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index c07de41..81cf511 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -16,6 +16,8 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
+
 /**
  * Base class of the {@link ICCApplicationContext} and the
  * {@link INCApplicationContext}.
@@ -39,4 +41,10 @@
      * @return
      */
     public Serializable getDistributedState();
+
+    public void setMessageBroker(IMessageBroker messageBroker);
+
+    public IMessageBroker getMessageBroker();
+
+    public String getApplicationName();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index 2e02101..4250112 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -66,4 +66,6 @@
      * @return The Cluster Controller Context.
      */
     public ICCContext getCCContext();
+
+
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index 042d341..e964d66 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -27,4 +27,6 @@
     public TaskAttemptId getTaskAttemptId();
 
     public ICounterContext getCounterContext();
+
+    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
new file mode 100644
index 0000000..96ad7fe
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+import java.io.Serializable;
+
+/**
+ * @author rico
+ * 
+ */
+public interface IMessage extends Serializable {
+
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
new file mode 100644
index 0000000..35ae813
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+/**
+ * @author rico
+ * 
+ */
+public interface IMessageBroker {
+
+    public void receivedMessage(IMessage message, String nodeId);
+
+}
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 47e3bff..dd0e39e 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationDestroyWork;
+import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
@@ -393,6 +394,12 @@
                     workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
                     return;
                 }
+                case SEND_APPLICATION_MESSAGE: {
+                    CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf
+                            .getAppName(), rsf.getNodeId()));
+                    return;
+                }
             }
             LOGGER.warning("Unknown function: " + fn.getFunctionId());
         }
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..22ff84d
--- /dev/null
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+/**
+ * @author rico
+ * 
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+    private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
+    private byte[] message;
+    private String nodeId;
+    private ClusterControllerService ccs;
+    private String appName;
+
+    public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, String appName, String nodeId) {
+        this.ccs = ccs;
+        this.nodeId = nodeId;
+        this.message = message;
+        this.appName = appName;
+    }
+
+    @Override
+    public void run() {
+
+        final ApplicationContext ctx = ccs.getApplicationMap().get(appName);
+        try {
+            final IMessage data = (IMessage) ctx.deserialize(message);
+            (new Thread() {
+                public void run() {
+                    ctx.getMessageBroker().receivedMessage(data, nodeId);
+                }
+            }).start();
+        } catch (IOException e) {
+            LOGGER.log(Level.WARNING, "Error in stats reporting", e);
+        } catch (ClassNotFoundException e) {
+            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "nodeID: " + nodeId;
+    }
+
+}
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 9f29fbd..fc755a5 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -38,6 +38,7 @@
 
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
@@ -54,6 +55,7 @@
     protected Properties deploymentDescriptor;
     protected IBootstrap bootstrap;
     protected Serializable distributedState;
+    protected IMessageBroker messageBroker;
 
     public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
         this.serverCtx = serverCtx;
@@ -205,4 +207,14 @@
     public ApplicationStatus getStatus() {
         return status;
     }
+
+    @Override
+    public void setMessageBroker(IMessageBroker messageBroker) {
+        this.messageBroker = messageBroker;
+    }
+
+    @Override
+    public IMessageBroker getMessageBroker() {
+        return this.messageBroker;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a03f5c9..a25250a 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.messages.IMessage;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -47,4 +48,6 @@
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
+
+    public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index f45f449..e5f0efc 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -74,10 +74,47 @@
         CREATE_APPLICATION,
         DESTROY_APPLICATION,
         REPORT_PARTITION_AVAILABILITY,
+        SEND_APPLICATION_MESSAGE,
 
         OTHER
     }
 
+    public static class SendApplicationMessageFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private byte[] serializedMessage;
+        private String nodeId;
+        private String appName;
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public void setNodeId(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public byte[] getMessage() {
+            return serializedMessage;
+        }
+
+        public SendApplicationMessageFunction(byte[] data, String appName, String nodeId) {
+            super();
+            this.serializedMessage = data;
+            this.nodeId = nodeId;
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.SEND_APPLICATION_MESSAGE;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+    }
+
     public static abstract class Function implements Serializable {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index a0dabdd..d789768 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -43,44 +43,40 @@
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
-                nodeId);
+        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
-        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
-                jobId, taskId, nodeId, statistics);
+        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+                nodeId, statistics);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
-        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
-                jobId, taskId, nodeId, details);
+        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+                details);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
-                jobId, nodeId);
+        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
-                hbData);
+        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
-                profiles);
+        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles);
         ipcHandle.send(-1, fn, null);
     }
 
@@ -104,4 +100,10 @@
                 nodeId, appName, status);
         ipcHandle.send(-1, fn, null);
     }
+
+    @Override
+    public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
+        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, appName, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3c32d8b..3cab638 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -62,6 +62,7 @@
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
 import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
 import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
@@ -367,6 +368,12 @@
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
+                case SEND_APPLICATION_MESSAGE: {
+                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+                            .getAppName(), amf.getNodeId()));
+                    return;
+                }
                 case START_TASKS: {
                     CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
                     queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
@@ -416,4 +423,8 @@
 
         }
     }
+
+    public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
+        ccs.sendApplicationMessageToCC(data, appName, nodeId);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 7addda8..91e1d51 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -87,7 +87,9 @@
 
     private volatile boolean aborted;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
+    private NodeControllerService ncs;
+
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
         this.joblet = joblet;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
@@ -101,6 +103,7 @@
         failed = false;
         errorBaos = new ByteArrayOutputStream();
         errorWriter = new PrintWriter(errorBaos, true);
+        this.ncs = ncs;
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -340,4 +343,9 @@
     public IStateObject getStateObject(Object id) {
         return opEnv.getStateObject(id);
     }
+
+    @Override
+    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+        this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext().getApplicationName(), nodeId);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..deb1b75
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+/**
+ * @author rico
+ * 
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+    private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
+    private byte[] message;
+    private String nodeId;
+    private NodeControllerService ncs;
+    private String appName;
+
+    public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String appName, String nodeId) {
+        this.ncs = ncs;
+        this.nodeId = nodeId;
+        this.message = message;
+        this.appName = appName;
+    }
+
+    @Override
+    public void run() {
+
+        NCApplicationContext ctx = ncs.getApplications().get(appName);
+        try {
+            IMessage data = (IMessage) ctx.deserialize(message);
+            if (ctx.getMessageBroker() != null) {
+                ctx.getMessageBroker().receivedMessage(data, nodeId);
+            } else {
+                LOGGER.log(Level.WARNING, "Messsage was sent, but no Message Broker set!");
+            }
+        } catch (IOException e) {
+            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+        } catch (ClassNotFoundException e) {
+            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "nodeID: " + nodeId;
+    }
+
+}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index eb982df..159ecb0 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -18,6 +18,7 @@
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.commons.io.IOUtils;
@@ -86,6 +87,7 @@
                     .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
         } catch (Exception e) {
             LOGGER.warning("Error creating application: " + e.getMessage());
+            LOGGER.log(Level.WARNING, e.getLocalizedMessage(), e);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 368539c..78101bf 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -108,7 +108,7 @@
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
                 final int partition = tid.getPartition();
-                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor());
+                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
 
                 List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
diff --git a/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java b/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java
index a50fc92..e631428 100644
--- a/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java
+++ b/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java
@@ -51,8 +51,10 @@
             int cmp = key.compareTo(vector.getBytes(index), vector.getStart(index), vector.getLength(index));
             if (cmp > 0) {
                 left = index + 1;
+                index = left;
             } else if (cmp < 0) {
                 right = index - 1;
+                index = left;
             } else {
                 return true;
             }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 79e4e65..b51132e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -111,13 +111,32 @@
                 for (int i = 0; i < framesLimit - 1; ++i) {
                     inFrames.add(ctx.allocateFrame());
                 }
-                while (runs.size() > 0) {
-                    try {
-                        doPass(runs);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
+                int maxMergeWidth = framesLimit - 1;
+                while (runs.size() > maxMergeWidth) {
+                    int generationSeparator = 0;
+                    while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
+                        int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
+                                runs.size() - maxMergeWidth + 1);
+                        FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
+                                .getSimpleName());
+                        IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
+                        mergeResultWriter.open();
+                        IFrameReader[] runCursors = new RunFileReader[mergeWidth];
+                        for (int i = 0; i < mergeWidth; i++) {
+                            runCursors[i] = runs.get(generationSeparator + i);
+                        }
+                        merge(mergeResultWriter, runCursors);
+                        runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
+                        runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
                     }
                 }
+                if (!runs.isEmpty()) {
+                    IFrameReader[] runCursors = new RunFileReader[runs.size()];
+                    for (int i = 0; i < runCursors.length; i++) {
+                        runCursors[i] = runs.get(i);
+                    }
+                    merge(writer, runCursors);
+                }
             }
         } catch (Exception e) {
             writer.fail();
@@ -127,44 +146,16 @@
         }
     }
 
-    // creates a new run from runs that can fit in memory.
-    private void doPass(List<IFrameReader> runs) throws HyracksDataException {
-        FileReference newRun = null;
-        IFrameWriter writer = this.writer;
-        boolean finalPass = false;
-        if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
-            finalPass = true;
-            for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
-                inFrames.remove(i);
-            }
-        } else {
-            newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
-            writer = new RunFileWriter(newRun, ctx.getIOManager());
-            writer.open();
-        }
+    private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
+        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
+                recordDesc);
+        merger.open();
         try {
-            IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
-            for (int i = 0; i < inFrames.size(); i++) {
-                runCursors[i] = runs.get(i);
-            }
-            RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
-                    comparators, recordDesc);
-            merger.open();
-            try {
-                while (merger.nextFrame(outFrame)) {
-                    FrameUtils.flushFrame(outFrame, writer);
-                }
-            } finally {
-                merger.close();
-            }
-            runs.subList(0, inFrames.size()).clear();
-            if (!finalPass) {
-                runs.add(0, ((RunFileWriter) writer).createReader());
+            while (merger.nextFrame(outFrame)) {
+                FrameUtils.flushFrame(outFrame, mergeResultWriter);
             }
         } finally {
-            if (!finalPass) {
-                writer.close();
-            }
+            merger.close();
         }
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index c014ed1..29c28a1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -54,11 +54,11 @@
 
     @Override
     public void open() throws HyracksDataException {
-        tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+        tupleAccessors = new FrameTupleAccessor[runCursors.length];
         Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, inFrames.size(), comparator);
-        tupleIndexes = new int[inFrames.size()];
-        for (int i = 0; i < inFrames.size(); i++) {
+        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
+        tupleIndexes = new int[runCursors.length];
+        for (int i = 0; i < runCursors.length; i++) {
             tupleIndexes[i] = 0;
             int runIndex = topTuples.peek().getRunid();
             runCursors[runIndex].open();
@@ -98,7 +98,7 @@
 
     @Override
     public void close() throws HyracksDataException {
-        for (int i = 0; i < inFrames.size(); ++i) {
+        for (int i = 0; i < runCursors.length; ++i) {
             closeRun(i, runCursors, tupleAccessors);
         }
     }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 2d747f9..4a1921e 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -58,7 +58,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider,
-                lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive);
+        return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields,
+                highKeyFields, lowKeyInclusive, highKeyInclusive);
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 1b95ccb..0bd872c 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
 
 public class TestNCApplicationContext implements INCApplicationContext {
     private final IHyracksRootContext rootCtx;
@@ -60,4 +61,22 @@
     public Object getApplicationObject() {
         return appObject;
     }
+
+    @Override
+    public String getApplicationName() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setMessageBroker(IMessageBroker staticticsConnector) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public IMessageBroker getMessageBroker() {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index b776f55..e18c0b8 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -99,4 +99,10 @@
     public IStateObject getStateObject(Object id) {
         return null;
     }
+
+    @Override
+    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+        // TODO Auto-generated method stub
+
+    }
 }
\ No newline at end of file