merged hyracks_asterix_stabilization r1606:1627
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1629 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 01fa9da..5234d2c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -44,5 +44,6 @@
WRITE_RESULT,
INSERT_DELETE,
INDEX_INSERT_DELETE,
- UPDATE
+ UPDATE,
+ EXTENSION_OPERATOR
}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 48f320b..95b320f 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -5,6 +5,7 @@
ASSIGN,
BROADCAST_EXCHANGE,
BTREE_SEARCH,
+ STATS,
DATASOURCE_SCAN,
EMPTY_TUPLE_SOURCE,
EXTERNAL_GROUP_BY,
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
index 4268a42..96afc8e 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
+
public interface IFunctionInfo {
FunctionIdentifier getFunctionIdentifier();
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
new file mode 100644
index 0000000..10ecb1e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+
+/**
+ * @author rico
+ */
+public abstract class AbstractExtensibleLogicalOperator implements IOperatorExtension {
+
+ private AbstractLogicalOperator.ExecutionMode mode = AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
+ protected List<LogicalVariable> schema;
+ protected IPhysicalOperator physicalOperator;
+
+ @Override
+ public ExecutionMode getExecutionMode() {
+ return mode;
+ }
+
+ @Override
+ public void setExecutionMode(ExecutionMode mode) {
+ this.mode = mode;
+ }
+
+ @Override
+ public void setSchema(List<LogicalVariable> schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public IPhysicalOperator getPhysicalOperator() {
+ return physicalOperator;
+ }
+
+ @Override
+ public void setPhysicalOperator(IPhysicalOperator physicalOperator) {
+ this.physicalOperator = physicalOperator;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
new file mode 100644
index 0000000..101b6f5
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * @author rico
+ */
+public class ExtensionOperator extends AbstractLogicalOperator {
+
+ private IOperatorExtension delegate;
+
+ public ExtensionOperator(IOperatorExtension delegate) {
+ super();
+ if (delegate == null) {
+ throw new IllegalArgumentException("delegate cannot be null!");
+ }
+ this.delegate = delegate;
+ setExecutionMode(delegate.getExecutionMode());
+ }
+
+ @Override
+ public void recomputeSchema() throws AlgebricksException {
+ schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+ delegate.setSchema(schema);
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitExtensionOperator(this, arg);
+ }
+
+ @Override
+ public boolean isMap() {
+ return this.delegate.isMap();
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return VariablePropagationPolicy.ALL;
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ return this.createPropagatingAllInputsTypeEnvironment(ctx);
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.EXTENSION_OPERATOR;
+ }
+
+ public IOperatorExtension getNewInstanceOfDelegateOperator() {
+ return delegate.newInstance();
+ }
+
+ @Override
+ public List<LogicalVariable> getSchema() {
+ return this.schema;
+ }
+
+ @Override
+ public ExecutionMode getExecutionMode() {
+ return delegate.getExecutionMode();
+ }
+
+ @Override
+ public void setExecutionMode(ExecutionMode mode) {
+ delegate.setExecutionMode(mode);
+ }
+
+ @Override
+ public IPhysicalOperator getPhysicalOperator() {
+ return delegate.getPhysicalOperator();
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeInputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ return this.createPropagatingAllInputsTypeEnvironment(ctx);
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
new file mode 100644
index 0000000..98c3301
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * @author rico
+ */
+public interface IOperatorExtension {
+
+ void setExecutionMode(ExecutionMode mode);
+
+ boolean isMap();
+
+ public IOperatorExtension newInstance();
+
+ boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException;
+
+ void setSchema(List<LogicalVariable> schema);
+
+ IPhysicalOperator getPhysicalOperator();
+
+ void setPhysicalOperator(IPhysicalOperator physicalOperator);
+
+ ExecutionMode getExecutionMode();
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 3ff1534..0539cbe 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -64,6 +64,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -637,4 +638,10 @@
return null;
}
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+ propagateFDsAndEquivClasses(op, ctx);
+ return null;
+ }
+
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index fbae89e..d845bf7 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -41,6 +41,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -110,6 +111,14 @@
}
@Override
+ public Boolean visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
+ ExtensionOperator aop = (ExtensionOperator) copyAndSubstituteVar(op, arg);
+ if (aop.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR)
+ return Boolean.FALSE;
+ return Boolean.TRUE;
+ }
+
+ @Override
public Boolean visitGroupByOperator(GroupByOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
// require the same physical operator, otherwise delivers different data
@@ -678,8 +687,7 @@
throws AlgebricksException {
ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newExpressions, Arrays.asList(op.getExpressions()));
- return new PartitioningSplitOperator(newExpressions.toArray(new Mutable[0]),
- op.hasDefault());
+ return new PartitioningSplitOperator(newExpressions.toArray(new Mutable[0]), op.hasDefault());
}
@Override
@@ -809,6 +817,11 @@
deepCopyExpressionRef(pair.second)));
return newOrdersAndExprs;
}
+
+ @Override
+ public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ return new ExtensionOperator(op.getNewInstanceOfDelegateOperator());
+ }
}
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 3c81250..562bb4c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -54,6 +54,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -414,4 +415,10 @@
return false;
}
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
+ mapVariablesStandard(op, arg);
+ return null;
+ }
+
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 2f0cf07..9b2f5a0 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -318,4 +319,10 @@
}
}
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext arg) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 0e2b4a2..994c6cb 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -49,6 +49,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -245,4 +246,9 @@
public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
return null;
}
+
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index a917756..9295179 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -48,6 +48,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -267,4 +268,10 @@
return null;
}
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
+ return null;
+ }
+
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 717a2ab..11e56ca 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -51,6 +51,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -409,4 +410,10 @@
IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);
env.substituteProducedVariable(arg.first, arg.second);
}
+
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ return null;
+ }
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 124a8e4..56487cc 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -49,6 +49,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -291,4 +292,9 @@
return null;
}
+ @Override
+ public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 9ab1a5e..a94c78e 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -48,6 +48,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -223,8 +224,7 @@
@Override
public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append(
- "unnest-map " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
+ addIndent(buffer, indent).append("unnest-map " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
return buffer.toString();
}
@@ -346,4 +346,11 @@
return buffer.toString();
}
+ @Override
+ public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
+ StringBuilder buffer = new StringBuilder();
+ addIndent(buffer, indent).append("statistics collection");
+ return buffer.toString();
+ }
+
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 93633c8..6b5949e 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -68,6 +69,8 @@
public R visitSelectOperator(SelectOperator op, T arg) throws AlgebricksException;
+ public R visitExtensionOperator(ExtensionOperator op, T arg) throws AlgebricksException;
+
public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException;
public R visitPartitioningSplitOperator(PartitioningSplitOperator op, T arg) throws AlgebricksException;
diff --git a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index f671224..7a4d71c4 100644
--- a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -188,4 +188,4 @@
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return FN_MAP.get(fid);
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index c07de41..81cf511 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -16,6 +16,8 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
+
/**
* Base class of the {@link ICCApplicationContext} and the
* {@link INCApplicationContext}.
@@ -39,4 +41,10 @@
* @return
*/
public Serializable getDistributedState();
+
+ public void setMessageBroker(IMessageBroker messageBroker);
+
+ public IMessageBroker getMessageBroker();
+
+ public String getApplicationName();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index 2e02101..4250112 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -66,4 +66,6 @@
* @return The Cluster Controller Context.
*/
public ICCContext getCCContext();
+
+
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index 042d341..e964d66 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -27,4 +27,6 @@
public TaskAttemptId getTaskAttemptId();
public ICounterContext getCounterContext();
+
+ public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
new file mode 100644
index 0000000..96ad7fe
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+import java.io.Serializable;
+
+/**
+ * @author rico
+ *
+ */
+public interface IMessage extends Serializable {
+
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
new file mode 100644
index 0000000..35ae813
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+/**
+ * @author rico
+ *
+ */
+public interface IMessageBroker {
+
+ public void receivedMessage(IMessage message, String nodeId);
+
+}
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 47e3bff..dd0e39e 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationDestroyWork;
+import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
@@ -393,6 +394,12 @@
workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
return;
}
+ case SEND_APPLICATION_MESSAGE: {
+ CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf
+ .getAppName(), rsf.getNodeId()));
+ return;
+ }
}
LOGGER.warning("Unknown function: " + fn.getFunctionId());
}
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..22ff84d
--- /dev/null
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+/**
+ * @author rico
+ *
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+ private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
+ private byte[] message;
+ private String nodeId;
+ private ClusterControllerService ccs;
+ private String appName;
+
+ public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, String appName, String nodeId) {
+ this.ccs = ccs;
+ this.nodeId = nodeId;
+ this.message = message;
+ this.appName = appName;
+ }
+
+ @Override
+ public void run() {
+
+ final ApplicationContext ctx = ccs.getApplicationMap().get(appName);
+ try {
+ final IMessage data = (IMessage) ctx.deserialize(message);
+ (new Thread() {
+ public void run() {
+ ctx.getMessageBroker().receivedMessage(data, nodeId);
+ }
+ }).start();
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Error in stats reporting", e);
+ } catch (ClassNotFoundException e) {
+ Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "nodeID: " + nodeId;
+ }
+
+}
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 9f29fbd..fc755a5 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -54,6 +55,7 @@
protected Properties deploymentDescriptor;
protected IBootstrap bootstrap;
protected Serializable distributedState;
+ protected IMessageBroker messageBroker;
public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
this.serverCtx = serverCtx;
@@ -205,4 +207,14 @@
public ApplicationStatus getStatus() {
return status;
}
+
+ @Override
+ public void setMessageBroker(IMessageBroker messageBroker) {
+ this.messageBroker = messageBroker;
+ }
+
+ @Override
+ public IMessageBroker getMessageBroker() {
+ return this.messageBroker;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a03f5c9..a25250a 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.messages.IMessage;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -47,4 +48,6 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
+
+ public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index f45f449..e5f0efc 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -74,10 +74,47 @@
CREATE_APPLICATION,
DESTROY_APPLICATION,
REPORT_PARTITION_AVAILABILITY,
+ SEND_APPLICATION_MESSAGE,
OTHER
}
+ public static class SendApplicationMessageFunction extends Function {
+ private static final long serialVersionUID = 1L;
+ private byte[] serializedMessage;
+ private String nodeId;
+ private String appName;
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public byte[] getMessage() {
+ return serializedMessage;
+ }
+
+ public SendApplicationMessageFunction(byte[] data, String appName, String nodeId) {
+ super();
+ this.serializedMessage = data;
+ this.nodeId = nodeId;
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.SEND_APPLICATION_MESSAGE;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ }
+
public static abstract class Function implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index a0dabdd..d789768 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -43,44 +43,40 @@
@Override
public void unregisterNode(String nodeId) throws Exception {
- CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
- nodeId);
+ CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
- jobId, taskId, nodeId, statistics);
+ CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+ nodeId, statistics);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
- CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
- jobId, taskId, nodeId, details);
+ CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+ details);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
- jobId, nodeId);
+ CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
ipcHandle.send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
- hbData);
+ CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData);
ipcHandle.send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
- profiles);
+ CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles);
ipcHandle.send(-1, fn, null);
}
@@ -104,4 +100,10 @@
nodeId, appName, status);
ipcHandle.send(-1, fn, null);
}
+
+ @Override
+ public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
+ CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, appName, nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3c32d8b..3cab638 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -62,6 +62,7 @@
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
@@ -367,6 +368,12 @@
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
+ case SEND_APPLICATION_MESSAGE: {
+ CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+ .getAppName(), amf.getNodeId()));
+ return;
+ }
case START_TASKS: {
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
@@ -416,4 +423,8 @@
}
}
+
+ public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
+ ccs.sendApplicationMessageToCC(data, appName, nodeId);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 7addda8..91e1d51 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -87,7 +87,9 @@
private volatile boolean aborted;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
+ private NodeControllerService ncs;
+
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
this.joblet = joblet;
this.taskAttemptId = taskId;
this.displayName = displayName;
@@ -101,6 +103,7 @@
failed = false;
errorBaos = new ByteArrayOutputStream();
errorWriter = new PrintWriter(errorBaos, true);
+ this.ncs = ncs;
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -340,4 +343,9 @@
public IStateObject getStateObject(Object id) {
return opEnv.getStateObject(id);
}
+
+ @Override
+ public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+ this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext().getApplicationName(), nodeId);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..deb1b75
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+/**
+ * @author rico
+ *
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+ private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
+ private byte[] message;
+ private String nodeId;
+ private NodeControllerService ncs;
+ private String appName;
+
+ public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String appName, String nodeId) {
+ this.ncs = ncs;
+ this.nodeId = nodeId;
+ this.message = message;
+ this.appName = appName;
+ }
+
+ @Override
+ public void run() {
+
+ NCApplicationContext ctx = ncs.getApplications().get(appName);
+ try {
+ IMessage data = (IMessage) ctx.deserialize(message);
+ if (ctx.getMessageBroker() != null) {
+ ctx.getMessageBroker().receivedMessage(data, nodeId);
+ } else {
+ LOGGER.log(Level.WARNING, "Messsage was sent, but no Message Broker set!");
+ }
+ } catch (IOException e) {
+ Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+ } catch (ClassNotFoundException e) {
+ Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "nodeID: " + nodeId;
+ }
+
+}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index eb982df..159ecb0 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -18,6 +18,7 @@
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
@@ -86,6 +87,7 @@
.notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
} catch (Exception e) {
LOGGER.warning("Error creating application: " + e.getMessage());
+ LOGGER.log(Level.WARNING, e.getLocalizedMessage(), e);
}
}
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 368539c..78101bf 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -108,7 +108,7 @@
LOGGER.info("Initializing " + taId + " -> " + han);
}
final int partition = tid.getPartition();
- Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor());
+ Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
diff --git a/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java b/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java
index a50fc92..e631428 100644
--- a/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java
+++ b/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/algorithms/BinarySearchAlgorithm.java
@@ -51,8 +51,10 @@
int cmp = key.compareTo(vector.getBytes(index), vector.getStart(index), vector.getLength(index));
if (cmp > 0) {
left = index + 1;
+ index = left;
} else if (cmp < 0) {
right = index - 1;
+ index = left;
} else {
return true;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 79e4e65..b51132e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -111,13 +111,32 @@
for (int i = 0; i < framesLimit - 1; ++i) {
inFrames.add(ctx.allocateFrame());
}
- while (runs.size() > 0) {
- try {
- doPass(runs);
- } catch (Exception e) {
- throw new HyracksDataException(e);
+ int maxMergeWidth = framesLimit - 1;
+ while (runs.size() > maxMergeWidth) {
+ int generationSeparator = 0;
+ while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
+ int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
+ runs.size() - maxMergeWidth + 1);
+ FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
+ .getSimpleName());
+ IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
+ mergeResultWriter.open();
+ IFrameReader[] runCursors = new RunFileReader[mergeWidth];
+ for (int i = 0; i < mergeWidth; i++) {
+ runCursors[i] = runs.get(generationSeparator + i);
+ }
+ merge(mergeResultWriter, runCursors);
+ runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
+ runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
}
}
+ if (!runs.isEmpty()) {
+ IFrameReader[] runCursors = new RunFileReader[runs.size()];
+ for (int i = 0; i < runCursors.length; i++) {
+ runCursors[i] = runs.get(i);
+ }
+ merge(writer, runCursors);
+ }
}
} catch (Exception e) {
writer.fail();
@@ -127,44 +146,16 @@
}
}
- // creates a new run from runs that can fit in memory.
- private void doPass(List<IFrameReader> runs) throws HyracksDataException {
- FileReference newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
- if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
- finalPass = true;
- for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
- inFrames.remove(i);
- }
- } else {
- newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
+ private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
+ recordDesc);
+ merger.open();
try {
- IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- runCursors[i] = runs.get(i);
- }
- RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
- comparators, recordDesc);
- merger.open();
- try {
- while (merger.nextFrame(outFrame)) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- } finally {
- merger.close();
- }
- runs.subList(0, inFrames.size()).clear();
- if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
+ while (merger.nextFrame(outFrame)) {
+ FrameUtils.flushFrame(outFrame, mergeResultWriter);
}
} finally {
- if (!finalPass) {
- writer.close();
- }
+ merger.close();
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index c014ed1..29c28a1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -54,11 +54,11 @@
@Override
public void open() throws HyracksDataException {
- tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ tupleAccessors = new FrameTupleAccessor[runCursors.length];
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, inFrames.size(), comparator);
- tupleIndexes = new int[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
+ topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
+ tupleIndexes = new int[runCursors.length];
+ for (int i = 0; i < runCursors.length; i++) {
tupleIndexes[i] = 0;
int runIndex = topTuples.peek().getRunid();
runCursors[runIndex].open();
@@ -98,7 +98,7 @@
@Override
public void close() throws HyracksDataException {
- for (int i = 0; i < inFrames.size(); ++i) {
+ for (int i = 0; i < runCursors.length; ++i) {
closeRun(i, runCursors, tupleAccessors);
}
}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 2d747f9..4a1921e 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -58,7 +58,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider,
- lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive);
+ return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields,
+ highKeyFields, lowKeyInclusive, highKeyInclusive);
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 1b95ccb..0bd872c 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
public class TestNCApplicationContext implements INCApplicationContext {
private final IHyracksRootContext rootCtx;
@@ -60,4 +61,22 @@
public Object getApplicationObject() {
return appObject;
}
+
+ @Override
+ public String getApplicationName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setMessageBroker(IMessageBroker staticticsConnector) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public IMessageBroker getMessageBroker() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
\ No newline at end of file
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index b776f55..e18c0b8 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -99,4 +99,10 @@
public IStateObject getStateObject(Object id) {
return null;
}
+
+ @Override
+ public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file