merged hyracks_asterix_stabilization r1683:1702
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1704 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index c99d621..165fccd 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -89,8 +89,4 @@
public IPhysicalPropertiesVector getDeliveredPhysicalProperties();
public void computeDeliveredPhysicalProperties(IOptimizationContext context) throws AlgebricksException;
-
- public void setHostQueryContext(Object context);
-
- public Object getHostQueryContext();
}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
index c406f2b..de9cf29 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
@@ -30,25 +30,24 @@
public static final String ALGEBRICKS_NS = "algebricks";
// comparisons
- public final static FunctionIdentifier EQ = new FunctionIdentifier(ALGEBRICKS_NS, "eq", 2, true);
- public final static FunctionIdentifier LE = new FunctionIdentifier(ALGEBRICKS_NS, "le", 2, true);
- public final static FunctionIdentifier GE = new FunctionIdentifier(ALGEBRICKS_NS, "ge", 2, true);
- public final static FunctionIdentifier LT = new FunctionIdentifier(ALGEBRICKS_NS, "lt", 2, true);
- public final static FunctionIdentifier GT = new FunctionIdentifier(ALGEBRICKS_NS, "gt", 2, true);
- public final static FunctionIdentifier NEQ = new FunctionIdentifier(ALGEBRICKS_NS, "neq", 2, true);
+ public final static FunctionIdentifier EQ = new FunctionIdentifier(ALGEBRICKS_NS, "eq", 2);
+ public final static FunctionIdentifier LE = new FunctionIdentifier(ALGEBRICKS_NS, "le", 2);
+ public final static FunctionIdentifier GE = new FunctionIdentifier(ALGEBRICKS_NS, "ge", 2);
+ public final static FunctionIdentifier LT = new FunctionIdentifier(ALGEBRICKS_NS, "lt", 2);
+ public final static FunctionIdentifier GT = new FunctionIdentifier(ALGEBRICKS_NS, "gt", 2);
+ public final static FunctionIdentifier NEQ = new FunctionIdentifier(ALGEBRICKS_NS, "neq", 2);
// booleans
- public final static FunctionIdentifier NOT = new FunctionIdentifier(ALGEBRICKS_NS, "not", 1, true);
+ public final static FunctionIdentifier NOT = new FunctionIdentifier(ALGEBRICKS_NS, "not", 1);
public final static FunctionIdentifier AND = new FunctionIdentifier(ALGEBRICKS_NS, "and",
- FunctionIdentifier.VARARGS, true);
- public final static FunctionIdentifier OR = new FunctionIdentifier(ALGEBRICKS_NS, "or", FunctionIdentifier.VARARGS,
- true);
+ FunctionIdentifier.VARARGS);
+ public final static FunctionIdentifier OR = new FunctionIdentifier(ALGEBRICKS_NS, "or", FunctionIdentifier.VARARGS);
// numerics
- public final static FunctionIdentifier NUMERIC_ADD = new FunctionIdentifier(ALGEBRICKS_NS, "numeric-add", 2, true);
+ public final static FunctionIdentifier NUMERIC_ADD = new FunctionIdentifier(ALGEBRICKS_NS, "numeric-add", 2);
// nulls
- public final static FunctionIdentifier IS_NULL = new FunctionIdentifier(ALGEBRICKS_NS, "is-null", 1, true);
+ public final static FunctionIdentifier IS_NULL = new FunctionIdentifier(ALGEBRICKS_NS, "is-null", 1);
private static final Map<FunctionIdentifier, ComparisonKind> comparisonFunctions = new HashMap<FunctionIdentifier, ComparisonKind>();
static {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
index 252f23c..548f646 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
@@ -14,36 +14,31 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
-public final class FunctionIdentifier {
- final private String namespace;
- final private String name;
- final private boolean isBuiltin;
- final private int arity;
+import java.io.Serializable;
+
+public final class FunctionIdentifier implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String namespace;
+ private final String name;
+ private final int arity;
public final static int VARARGS = -1;
- public FunctionIdentifier(String namespace, String name, boolean isBuiltin) {
- this.namespace = namespace;
- this.name = name;
- this.arity = VARARGS;
- this.isBuiltin = isBuiltin;
+ public FunctionIdentifier(String namespace, String name) {
+ this(namespace, name, VARARGS);
}
- public FunctionIdentifier(String namespace, String name, int arity, boolean isBuiltin) {
+ public FunctionIdentifier(String namespace, String name, int arity) {
this.namespace = namespace;
this.name = name;
this.arity = arity;
- this.isBuiltin = isBuiltin;
}
public String getName() {
return name;
}
- public boolean isBuiltin() {
- return isBuiltin;
- }
-
@Override
public boolean equals(Object o) {
if (super.equals(o)) {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 4acd53b..dc0edfe 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -59,7 +59,6 @@
protected IPhysicalOperator physicalOperator;
private final Map<String, Object> annotations = new HashMap<String, Object>();
private boolean bJobGenEnabled = true;
- private Object hostQueryContext;
final protected List<Mutable<ILogicalOperator>> inputs;
// protected List<LogicalOperatorReference> outputs;
@@ -168,14 +167,6 @@
return bJobGenEnabled;
}
- public void setHostQueryContext(Object context) {
- this.hostQueryContext = context;
- }
-
- public Object getHostQueryContext() {
- return hostQueryContext;
- }
-
@Override
public IVariableTypeEnvironment computeInputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index 9f75bbc..f314e09 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -18,6 +18,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
@@ -83,33 +85,34 @@
private class PartitionWriter implements Runnable, IInputChannelMonitor {
private PartitionChannel pc;
- private int nAvailableFrames;
+ private final AtomicInteger nAvailableFrames;
- private boolean eos;
+ private final AtomicBoolean eos;
- private boolean failed;
+ private final AtomicBoolean failed;
public PartitionWriter(PartitionChannel pc) {
this.pc = pc;
- nAvailableFrames = 0;
- eos = false;
+ nAvailableFrames = new AtomicInteger(0);
+ eos = new AtomicBoolean(false);
+ failed = new AtomicBoolean(false);
}
@Override
public synchronized void notifyFailure(IInputChannel channel) {
- failed = true;
+ failed.set(true);
notifyAll();
}
@Override
public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
- nAvailableFrames += nFrames;
+ nAvailableFrames.addAndGet(nFrames);
notifyAll();
}
@Override
public synchronized void notifyEndOfStream(IInputChannel channel) {
- eos = true;
+ eos.set(true);
notifyAll();
}
@@ -123,26 +126,22 @@
channel.open();
mpw.open();
while (true) {
- int nAvailableFrames;
- boolean eos;
- boolean failed;
- synchronized (this) {
- nAvailableFrames = this.nAvailableFrames;
- eos = this.eos;
- failed = this.failed;
- }
- if (nAvailableFrames > 0) {
+ if (nAvailableFrames.get() > 0) {
ByteBuffer buffer = channel.getNextBuffer();
- --nAvailableFrames;
+ nAvailableFrames.decrementAndGet();
mpw.nextFrame(buffer);
channel.recycleBuffer(buffer);
- } else if (eos) {
+ } else if (eos.get()) {
break;
- } else if (failed) {
+ } else if (failed.get()) {
throw new HyracksDataException("Failure occurred on input");
} else {
try {
- wait();
+ synchronized (this) {
+ if (nAvailableFrames.get() <= 0 && !eos.get() && !failed.get()) {
+ wait();
+ }
+ }
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}