merged hyracks_asterix_stabilization r1596:1599
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1606 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index de22f52..f1e7acb 100644
--- a/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -27,10 +27,10 @@
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
@@ -45,8 +45,8 @@
protected ISerializerDeserializerProvider serializerDeserializerProvider;
protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
protected IBinaryComparatorFactoryProvider comparatorFactoryProvider;
- protected IBinaryBooleanInspector binaryBooleanInspector;
- protected IBinaryIntegerInspector binaryIntegerInspector;
+ protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+ protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
protected IPrinterFactoryProvider printerProvider;
protected IExpressionRuntimeProvider expressionRuntimeProvider;
protected IExpressionTypeComputer expressionTypeComputer;
@@ -102,20 +102,20 @@
return comparatorFactoryProvider;
}
- public void setBinaryBooleanInspector(IBinaryBooleanInspector binaryBooleanInspector) {
- this.binaryBooleanInspector = binaryBooleanInspector;
+ public void setBinaryBooleanInspectorFactory(IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+ this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
}
- public IBinaryBooleanInspector getBinaryBooleanInspector() {
- return binaryBooleanInspector;
+ public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+ return binaryBooleanInspectorFactory;
}
- public void setBinaryIntegerInspector(IBinaryIntegerInspector binaryIntegerInspector) {
- this.binaryIntegerInspector = binaryIntegerInspector;
+ public void setBinaryIntegerInspectorFactory(IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
+ this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
}
- public IBinaryIntegerInspector getBinaryIntegerInspector() {
- return binaryIntegerInspector;
+ public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+ return binaryIntegerInspectorFactory;
}
public void setPrinterProvider(IPrinterFactoryProvider printerProvider) {
diff --git a/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index c4c362b..1d21463 100644
--- a/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-algebricks/hyracks-algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -85,10 +85,10 @@
AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
JobGenContext context = new JobGenContext(null, metadata, appContext,
serializerDeserializerProvider, hashFunctionFactoryProvider, comparatorFactoryProvider,
- typeTraitProvider, binaryBooleanInspector, binaryIntegerInspector, printerProvider,
- nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
- expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
- partialAggregationTypeComputer, frameSize, clusterLocations);
+ typeTraitProvider, binaryBooleanInspectorFactory, binaryIntegerInspectorFactory,
+ printerProvider, nullWriterFactory, normalizedKeyComputerFactoryProvider,
+ expressionRuntimeProvider, expressionTypeComputer, nullableTypeComputer, oc,
+ expressionEvalSizeComputer, partialAggregationTypeComputer, frameSize, clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
return pc.compilePlan(plan, null);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
index 73cc42e..3311fac 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -81,7 +82,7 @@
}
@Override
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
final ICopyEvaluator ce = cef.createEvaluator(abvs);
return new IScalarEvaluator() {
@@ -105,7 +106,7 @@
}
@Override
- public IAggregateEvaluator createAggregateEvaluator() throws AlgebricksException {
+ public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
final ICopyAggregateFunction caf = caff.createAggregateFunction(abvs);
return new IAggregateEvaluator() {
@@ -168,7 +169,7 @@
}
@Override
- public IUnnestingEvaluator createUnnestingEvaluator() throws AlgebricksException {
+ public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
final ICopyUnnestingFunction cuf = cuff.createUnnestingFunction(abvs);
return new IUnnestingEvaluator() {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index d3ec3bf..aa8e560 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -38,9 +38,11 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -127,7 +129,7 @@
IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
context.getTypeEnvironment(op), conditionInputSchemas, context);
ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
- context.getBinaryBooleanInspector());
+ context.getBinaryBooleanInspectorFactory());
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc = null;
@@ -153,22 +155,22 @@
private static final long serialVersionUID = 1L;
private final IScalarEvaluatorFactory cond;
- private final IBinaryBooleanInspector binaryBooleanInspector;
+ private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
- public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond, IBinaryBooleanInspector binaryBooleanInspector) {
+ public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
+ IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
this.cond = cond;
- this.binaryBooleanInspector = binaryBooleanInspector;
+ this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
}
@Override
- public synchronized ITuplePairComparator createTuplePairComparator() {
- return new TuplePairEvaluator(cond, binaryBooleanInspector);
+ public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
}
-
}
public static class TuplePairEvaluator implements ITuplePairComparator {
-
+ private final IHyracksTaskContext ctx;
private IScalarEvaluator condEvaluator;
private final IScalarEvaluatorFactory condFactory;
private final IPointable p;
@@ -177,7 +179,9 @@
private final FrameTupleReference rightRef;
private final IBinaryBooleanInspector binaryBooleanInspector;
- public TuplePairEvaluator(IScalarEvaluatorFactory condFactory, IBinaryBooleanInspector binaryBooleanInspector) {
+ public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
+ IBinaryBooleanInspector binaryBooleanInspector) {
+ this.ctx = ctx;
this.condFactory = condFactory;
this.binaryBooleanInspector = binaryBooleanInspector;
this.leftRef = new FrameTupleReference();
@@ -191,7 +195,7 @@
int innerIndex) throws HyracksDataException {
if (condEvaluator == null) {
try {
- this.condEvaluator = condFactory.createScalarEvaluator();
+ this.condEvaluator = condFactory.createScalarEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
index 294c8ce..c0e4b69 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
@@ -73,7 +73,7 @@
.getAfterObjects().getValue(), env, inputSchemas, context);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(op, propagatedSchema, context);
StreamDieRuntimeFactory runtime = new StreamDieRuntimeFactory(afterObjectsFact, null,
- context.getBinaryIntegerInspector());
+ context.getBinaryIntegerInspectorFactory());
builder.contributeMicroOperator(die, runtime, recDesc);
ILogicalOperator src = die.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, die, 0);
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index 37328d3..34127aa 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -85,7 +85,7 @@
.createEvaluatorFactory(offsetExpr, env, inputSchemas, context);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(op, propagatedSchema, context);
StreamLimitRuntimeFactory runtime = new StreamLimitRuntimeFactory(maxObjectsFact, offsetFact, null,
- context.getBinaryIntegerInspector());
+ context.getBinaryIntegerInspectorFactory());
builder.contributeMicroOperator(limit, runtime, recDesc);
// and contribute one edge from its child
ILogicalOperator src = limit.getInputs().get(0).getValue();
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index 1cb0fa3..891f72a 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -63,7 +63,7 @@
IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(select.getCondition()
.getValue(), context.getTypeEnvironment(op), inputSchemas, context);
StreamSelectRuntimeFactory runtime = new StreamSelectRuntimeFactory(cond, null,
- context.getBinaryBooleanInspector());
+ context.getBinaryBooleanInspectorFactory());
// contribute one Asterix framewriter
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(op, opSchema, context);
builder.contributeMicroOperator(select, runtime, recDesc);
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 95ca8c2..22a1a81 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -31,10 +31,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
@@ -53,8 +53,8 @@
private final INullWriterFactory nullWriterFactory;
private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
private final Object appContext;
- private final IBinaryBooleanInspector booleanInspector;
- private final IBinaryIntegerInspector integerInspector;
+ private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+ private final IBinaryIntegerInspectorFactory integerInspectorFactory;
private final IExpressionRuntimeProvider expressionRuntimeProvider;
private final IExpressionTypeComputer expressionTypeComputer;
private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
@@ -68,8 +68,9 @@
ISerializerDeserializerProvider serializerDeserializerProvider,
IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
- IBinaryBooleanInspector booleanInspector, IBinaryIntegerInspector integerInspector,
- IPrinterFactoryProvider printerFactoryProvider, INullWriterFactory nullWriterFactory,
+ IBinaryBooleanInspectorFactory booleanInspectorFactory,
+ IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
+ INullWriterFactory nullWriterFactory,
INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
@@ -83,8 +84,8 @@
this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
this.comparatorFactoryProvider = comparatorFactoryProvider;
this.typeTraitProvider = typeTraitProvider;
- this.booleanInspector = booleanInspector;
- this.integerInspector = integerInspector;
+ this.booleanInspectorFactory = booleanInspectorFactory;
+ this.integerInspectorFactory = integerInspectorFactory;
this.printerFactoryProvider = printerFactoryProvider;
this.clusterLocations = clusterLocations;
this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
@@ -130,12 +131,12 @@
return typeTraitProvider;
}
- public IBinaryBooleanInspector getBinaryBooleanInspector() {
- return booleanInspector;
+ public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+ return booleanInspectorFactory;
}
- public IBinaryIntegerInspector getBinaryIntegerInspector() {
- return integerInspector;
+ public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+ return integerInspectorFactory;
}
public IPrinterFactoryProvider getPrinterFactoryProvider() {
diff --git a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryBooleanInspector.java b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryBooleanInspector.java
index 88ff710..a1b7086 100644
--- a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryBooleanInspector.java
+++ b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryBooleanInspector.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.algebricks.data;
-import java.io.Serializable;
-
-public interface IBinaryBooleanInspector extends Serializable {
+public interface IBinaryBooleanInspector {
public boolean getBooleanValue(byte[] bytes, int offset, int length);
-}
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryBooleanInspectorFactory.java
similarity index 63%
copy from hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
copy to hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryBooleanInspectorFactory.java
index 8a1cf0a..23d96ca 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
+++ b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryBooleanInspectorFactory.java
@@ -12,21 +12,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.algebricks.runtime.context;
+package edu.uci.ics.hyracks.algebricks.data;
+
+import java.io.Serializable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-public class RuntimeContext {
- private IHyracksTaskContext hyracksContext;
-
- public RuntimeContext() {
- }
-
- public IHyracksTaskContext getHyracksContext() {
- return hyracksContext;
- }
-
- public void setHyracksContext(IHyracksTaskContext hyracksContext) {
- this.hyracksContext = hyracksContext;
- }
+public interface IBinaryBooleanInspectorFactory extends Serializable {
+ public IBinaryBooleanInspector createBinaryBooleanInspector(IHyracksTaskContext ctx);
}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryIntegerInspector.java b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryIntegerInspector.java
index 24ba948..24a7eb1 100644
--- a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryIntegerInspector.java
+++ b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryIntegerInspector.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.algebricks.data;
-import java.io.Serializable;
-
-public interface IBinaryIntegerInspector extends Serializable {
+public interface IBinaryIntegerInspector {
public int getIntegerValue(byte[] bytes, int offset, int length);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryIntegerInspectorFactory.java
similarity index 63%
rename from hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
rename to hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryIntegerInspectorFactory.java
index 8a1cf0a..8760992 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
+++ b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryIntegerInspectorFactory.java
@@ -12,21 +12,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.algebricks.runtime.context;
+package edu.uci.ics.hyracks.algebricks.data;
+
+import java.io.Serializable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-public class RuntimeContext {
- private IHyracksTaskContext hyracksContext;
-
- public RuntimeContext() {
- }
-
- public IHyracksTaskContext getHyracksContext() {
- return hyracksContext;
- }
-
- public void setHyracksContext(IHyracksTaskContext hyracksContext) {
- this.hyracksContext = hyracksContext;
- }
+public interface IBinaryIntegerInspectorFactory extends Serializable {
+ public IBinaryIntegerInspector createBinaryIntegerInspector(IHyracksTaskContext ctx);
}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryBooleanInspectorImpl.java b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryBooleanInspectorImpl.java
index 0d1d444..b9bbb7d 100644
--- a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryBooleanInspectorImpl.java
+++ b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryBooleanInspectorImpl.java
@@ -15,12 +15,18 @@
package edu.uci.ics.hyracks.algebricks.data.impl;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public class BinaryBooleanInspectorImpl implements IBinaryBooleanInspector {
+ public static final IBinaryBooleanInspectorFactory FACTORY = new IBinaryBooleanInspectorFactory() {
+ private static final long serialVersionUID = 1L;
- private static final long serialVersionUID = 1L;
-
- public static final BinaryBooleanInspectorImpl INSTANCE = new BinaryBooleanInspectorImpl();
+ @Override
+ public IBinaryBooleanInspector createBinaryBooleanInspector(IHyracksTaskContext ctx) {
+ return new BinaryBooleanInspectorImpl();
+ }
+ };
private BinaryBooleanInspectorImpl() {
}
@@ -29,5 +35,4 @@
public boolean getBooleanValue(byte[] bytes, int offset, int length) {
return bytes[offset] == 1;
}
-
-}
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryIntegerInspectorImpl.java b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryIntegerInspectorImpl.java
index e1016cf..2fc170f 100644
--- a/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryIntegerInspectorImpl.java
+++ b/hyracks-algebricks/hyracks-algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/impl/BinaryIntegerInspectorImpl.java
@@ -15,16 +15,25 @@
package edu.uci.ics.hyracks.algebricks.data.impl;
import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
public class BinaryIntegerInspectorImpl implements IBinaryIntegerInspector {
+ public static final IBinaryIntegerInspectorFactory FACTORY = new IBinaryIntegerInspectorFactory() {
+ private static final long serialVersionUID = 1L;
- private static final long serialVersionUID = 1L;
- public static final BinaryIntegerInspectorImpl INSTANCE = new BinaryIntegerInspectorImpl();
+ @Override
+ public IBinaryIntegerInspector createBinaryIntegerInspector(IHyracksTaskContext ctx) {
+ return new BinaryIntegerInspectorImpl();
+ }
+ };
+
+ private BinaryIntegerInspectorImpl() {
+ }
@Override
public int getIntegerValue(byte[] bytes, int offset, int length) {
return IntegerPointable.getInteger(bytes, offset);
}
-
-}
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
index 8d58f0d..64bb310 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
@@ -5,6 +5,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -14,7 +15,7 @@
private static final long serialVersionUID = 1L;
@Override
- public IAggregateEvaluator createAggregateEvaluator() throws AlgebricksException {
+ public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
return new IAggregateEvaluator() {
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
index efc8a9d..83bfbf7 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
@@ -17,7 +17,8 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public interface IAggregateEvaluatorFactory extends Serializable {
- public IAggregateEvaluator createAggregateEvaluator() throws AlgebricksException;
+ public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index 3e349e1..f1d01e6 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -17,8 +17,8 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public interface IPushRuntimeFactory extends Serializable {
- public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException;
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException;
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
index dcf7d35..e40b96c 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
@@ -17,7 +17,8 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public interface IScalarEvaluatorFactory extends Serializable {
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException;
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
index 80157ff..8bca2b9 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
@@ -17,7 +17,8 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public interface IUnnestingEvaluatorFactory extends Serializable {
- public IUnnestingEvaluator createUnnestingEvaluator() throws AlgebricksException;
+ public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
index a55e226..4f7c35a 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -35,7 +36,7 @@
}
@Override
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
return new IScalarEvaluator() {
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
index 5e75810..29328ff 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
@@ -3,6 +3,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -16,7 +17,7 @@
}
@Override
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
return new IScalarEvaluator() {
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index befc253..a975195 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -19,9 +19,9 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -56,7 +56,7 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
return new AbstractOneInputOneOutputOneFramePushRuntime() {
@@ -71,9 +71,9 @@
try {
if (first) {
first = false;
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
for (int i = 0; i < aggregFactories.length; i++) {
- aggregs[i] = aggregFactories[i].createAggregateEvaluator();
+ aggregs[i] = aggregFactories[i].createAggregateEvaluator(ctx);
}
}
for (int i = 0; i < aggregFactories.length; i++) {
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index dfd9ec7..2ae6f72 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -20,7 +20,6 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -51,14 +50,12 @@
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
- final RuntimeContext rc = new RuntimeContext();
- rc.setHyracksContext(ctx);
final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
try {
- pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, rc);
+ pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
@@ -143,14 +140,14 @@
};
}
- private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, RuntimeContext rc)
+ private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
throws AlgebricksException {
// plug the operators
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(rc);
+ IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 30eebaa..83925cc 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -40,7 +40,7 @@
}
@Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
return new IAggregatorDescriptor() {
@@ -105,7 +105,7 @@
IAggregateEvaluator[] agg = new IAggregateEvaluator[aggFactories.length];
for (int i = 0; i < agg.length; i++) {
try {
- agg[i] = aggFactories[i].createAggregateEvaluator();
+ agg[i] = aggFactories[i].createAggregateEvaluator(ctx);
} catch (AlgebricksException e) {
throw new IllegalStateException(e);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 0ff1b89..c2d48b0 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -16,7 +16,6 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -74,18 +73,17 @@
}
}
- protected final void initAccessAppend(RuntimeContext context) {
- IHyracksTaskContext hCtx = context.getHyracksContext();
+ protected final void initAccessAppend(IHyracksTaskContext ctx) {
// if (allocFrame) {
- frame = hCtx.allocateFrame();
- appender = new FrameTupleAppender(hCtx.getFrameSize());
+ frame = ctx.allocateFrame();
+ appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);
// }
- tAccess = new FrameTupleAccessor(hCtx.getFrameSize(), inputRecordDesc);
+ tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
}
- protected final void initAccessAppendRef(RuntimeContext context) {
- initAccessAppend(context);
+ protected final void initAccessAppendRef(IHyracksTaskContext ctx) {
+ initAccessAppend(ctx);
tRef = new FrameTupleReference();
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 81db95e..f2c1008 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -17,7 +17,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRuntimeFactory {
@@ -30,11 +30,11 @@
}
@Override
- public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
- return createOneOutputPushRuntime(context);
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+ return createOneOutputPushRuntime(ctx);
}
- public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(RuntimeContext context)
+ public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx)
throws AlgebricksException;
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index 81e91f6..f175367 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -19,7 +19,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class SinkRuntimeFactory implements IPushRuntimeFactory {
@@ -35,7 +35,7 @@
}
@Override
- public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
return new AbstractOneInputSinkPushRuntime() {
@Override
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index ea83fdd..388e6c9 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -4,7 +4,6 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -43,14 +42,13 @@
}
@Override
- public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(RuntimeContext context)
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
try {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final IHyracksTaskContext ctx = context.getHyracksContext();
final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
outRecordDesc, groupFields, groupFields);
final ByteBuffer copyFrame = ctx.allocateFrame();
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 91fa8a1..e978ade 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -98,9 +97,7 @@
PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
pipelineOutputRecordDescriptor);
try {
- RuntimeContext rc = new RuntimeContext();
- rc.setHyracksContext(ctx);
- startOfPipeline = pa.assemblePipeline(writer, rc);
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
@@ -126,9 +123,7 @@
PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
try {
- RuntimeContext rc = new RuntimeContext();
- rc.setHyracksContext(ctx);
- startOfPipeline = pa.assemblePipeline(writer, rc);
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 1041438..92410c2 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -17,8 +17,8 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public class PipelineAssembler {
@@ -40,11 +40,11 @@
this.outputArity = outputArity;
}
- public IFrameWriter assemblePipeline(IFrameWriter writer, RuntimeContext rc) throws AlgebricksException {
+ public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws AlgebricksException {
// plug the operators
IFrameWriter start = writer;// this.writer;
for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
- IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(rc);
+ IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
if (i == pipeline.getRuntimeFactories().length - 1) {
if (outputArity == 1) {
newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index c7adf1f..f1ee570 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -21,7 +21,6 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
@@ -67,7 +66,7 @@
}
@Override
- public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
RecordDescriptor pipelineOutputRecordDescriptor = null;
@@ -82,15 +81,13 @@
return new AbstractOneInputOneOutputOneFramePushRuntime() {
/**
- *
* Computes the outer product between a given tuple and the frames
* passed.
- *
*/
class TupleOuterProduct implements IFrameWriter {
private boolean smthWasWritten = false;
- private IHyracksTaskContext hCtx = context.getHyracksContext();
+ private IHyracksTaskContext hCtx = ctx;
private int frameSize = hCtx.getFrameSize();
private FrameTupleAccessor ta = new FrameTupleAccessor(frameSize,
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
@@ -148,7 +145,7 @@
IFrameWriter endPipe = new TupleOuterProduct();
- NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, context);
+ NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
boolean first = true;
@@ -156,7 +153,7 @@
public void open() throws HyracksDataException {
if (first) {
first = false;
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
}
writer.open();
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 3fa3a50..fd2710d 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -18,9 +18,9 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -47,7 +47,7 @@
}
@Override
- public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
return new AbstractOneInputOneOutputPushRuntime() {
@@ -57,8 +57,8 @@
@Override
public void open() throws HyracksDataException {
if (frameSorter == null) {
- frameSorter = new FrameSorter(context.getHyracksContext(), sortFields, firstKeyNormalizerFactory,
- comparatorFactories, outputRecordDesc);
+ frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ outputRecordDesc);
}
frameSorter.reset();
writer.open();
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index c7efc0a..e47384d 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -20,10 +20,10 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -73,7 +73,7 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
final int[] projectionToOutColumns = new int[projectionList.length];
for (int j = 0; j < projectionList.length; j++) {
@@ -89,12 +89,12 @@
@Override
public void open() throws HyracksDataException {
if (first) {
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
first = false;
int n = evalFactories.length;
for (int i = 0; i < n; i++) {
try {
- eval[i] = evalFactories[i].createScalarEvaluator();
+ eval[i] = evalFactories[i].createScalarEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 659ba74..dbe1ab1 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -18,7 +18,6 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -39,13 +38,12 @@
}
@Override
- public IPushRuntime createPushRuntime(final RuntimeContext context) {
+ public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) {
return new AbstractOneInputSourcePushRuntime() {
- private IHyracksTaskContext hCtx = context.getHyracksContext();
- private ByteBuffer frame = hCtx.allocateFrame();
+ private ByteBuffer frame = ctx.allocateFrame();
private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
- private FrameTupleAppender appender = new FrameTupleAppender(hCtx.getFrameSize());
+ private FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
@Override
public void open() throws HyracksDataException {
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 899147d..42724f7 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -18,8 +18,8 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
@@ -35,14 +35,14 @@
}
@Override
- public IPushRuntime createPushRuntime(RuntimeContext context) {
- return new NestedTupleSourceRuntime(context);
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+ return new NestedTupleSourceRuntime(ctx);
}
public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
- public NestedTupleSourceRuntime(RuntimeContext rc) {
- initAccessAppend(rc);
+ public NestedTupleSourceRuntime(IHyracksTaskContext ctx) {
+ initAccessAppend(ctx);
}
@Override
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index d4040f2..5b53fc2 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -18,8 +18,8 @@
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public class PrinterRuntimeFactory implements IPushRuntimeFactory {
@@ -52,9 +52,9 @@
}
@Override
- public IPushRuntime createPushRuntime(final RuntimeContext context) {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories,
inputRecordDesc);
- return new SinkWriterRuntime(w, context, System.out, inputRecordDesc);
+ return new SinkWriterRuntime(w, ctx, System.out, inputRecordDesc);
}
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 2f69b0a..c15e050 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -20,10 +20,10 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -74,7 +74,7 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
final int[] projectionToOutColumns = new int[projectionList.length];
for (int j = 0; j < projectionList.length; j++) {
@@ -89,7 +89,7 @@
@Override
public void open() throws HyracksDataException {
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
if (first) {
first = false;
int n = runningAggregates.length;
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index 56a3f0e..148f087 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -19,15 +19,15 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IAWriter;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
- private final RuntimeContext context;
+ private final IHyracksTaskContext ctx;
private final PrintStream printStream;
private final IAWriter writer;
private RecordDescriptor inputRecordDesc;
@@ -35,18 +35,18 @@
private boolean autoClose = false;
private boolean first = true;
- public SinkWriterRuntime(IAWriter writer, RuntimeContext context, PrintStream printStream,
+ public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
RecordDescriptor inputRecordDesc) {
this.writer = writer;
- this.context = context;
+ this.ctx = ctx;
this.printStream = printStream;
this.inputRecordDesc = inputRecordDesc;
- this.tAccess = new FrameTupleAccessor(context.getHyracksContext().getFrameSize(), inputRecordDesc);
+ this.tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
}
- public SinkWriterRuntime(IAWriter writer, RuntimeContext context, PrintStream printStream,
+ public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
RecordDescriptor inputRecordDesc, boolean autoClose) {
- this(writer, context, printStream, inputRecordDesc);
+ this(writer, ctx, printStream, inputRecordDesc);
this.autoClose = autoClose;
}
@@ -54,7 +54,7 @@
public void open() throws HyracksDataException {
if (first) {
first = false;
- tAccess = new FrameTupleAccessor(context.getHyracksContext().getFrameSize(), inputRecordDesc);
+ tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
try {
writer.init();
} catch (AlgebricksException e) {
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index 0641b0c..1aff1aa 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
@@ -63,7 +63,7 @@
}
@Override
- public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
PrintStream filePrintStream = null;
try {
filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
@@ -71,6 +71,6 @@
throw new AlgebricksException(e);
}
IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc);
- return new SinkWriterRuntime(w, context, filePrintStream, inputRecordDesc, true);
+ return new SinkWriterRuntime(w, ctx, filePrintStream, inputRecordDesc, true);
}
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
index e6b0e2f..796ef0a 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
@@ -4,11 +4,12 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -18,13 +19,13 @@
private static final long serialVersionUID = 1L;
private IScalarEvaluatorFactory aftterObjectsEvalFactory;
- private IBinaryIntegerInspector binaryIntegerInspector;
+ private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
public StreamDieRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
- IBinaryIntegerInspector binaryIntegerInspector) {
+ IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
super(projectionList);
this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
- this.binaryIntegerInspector = binaryIntegerInspector;
+ this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
}
@Override
@@ -34,7 +35,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
return new AbstractOneInputOneOutputOneFramePushRuntime() {
private IPointable p = VoidPointable.FACTORY.createPointable();
private IScalarEvaluator evalAfterObjects;
@@ -43,9 +45,9 @@
@Override
public void open() throws HyracksDataException {
if (evalAfterObjects == null) {
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
try {
- evalAfterObjects = aftterObjectsEvalFactory.createScalarEvaluator();
+ evalAfterObjects = aftterObjectsEvalFactory.createScalarEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
@@ -86,7 +88,7 @@
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
- int lim = binaryIntegerInspector.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+ int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
return lim;
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 3f8be24..4e9d51c 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -4,11 +4,12 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -19,15 +20,15 @@
private IScalarEvaluatorFactory maxObjectsEvalFactory;
private IScalarEvaluatorFactory offsetEvalFactory;
- private IBinaryIntegerInspector binaryIntegerInspector;
+ private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
public StreamLimitRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory,
IScalarEvaluatorFactory offsetEvalFactory, int[] projectionList,
- IBinaryIntegerInspector binaryIntegerInspector) {
+ IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
super(projectionList);
this.maxObjectsEvalFactory = maxObjectsEvalFactory;
this.offsetEvalFactory = offsetEvalFactory;
- this.binaryIntegerInspector = binaryIntegerInspector;
+ this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
}
@Override
@@ -41,7 +42,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
return new AbstractOneInputOneOutputOneFramePushRuntime() {
private IPointable p = VoidPointable.FACTORY.createPointable();
private IScalarEvaluator evalMaxObjects;
@@ -55,11 +57,11 @@
public void open() throws HyracksDataException {
// if (first) {
if (evalMaxObjects == null) {
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
try {
- evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator();
+ evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx);
if (offsetEvalFactory != null) {
- evalOffset = offsetEvalFactory.createScalarEvaluator();
+ evalOffset = offsetEvalFactory.createScalarEvaluator(ctx);
}
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
@@ -124,7 +126,7 @@
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
- int lim = binaryIntegerInspector.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+ int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
return lim;
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index e452059..455ad8e 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -18,9 +18,9 @@
import java.util.Arrays;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -37,7 +37,7 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
return new AbstractOneInputOneOutputOneFramePushRuntime() {
@@ -48,7 +48,7 @@
public void open() throws HyracksDataException {
if (first) {
first = false;
- initAccessAppend(context);
+ initAccessAppend(ctx);
}
writer.open();
}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index f5dbd95..b0c94b7 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -18,11 +18,12 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -33,7 +34,7 @@
private IScalarEvaluatorFactory cond;
- private IBinaryBooleanInspector binaryBooleanInspector;
+ private IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
/**
* @param cond
@@ -41,10 +42,10 @@
* if projectionList is null, then no projection is performed
*/
public StreamSelectRuntimeFactory(IScalarEvaluatorFactory cond, int[] projectionList,
- IBinaryBooleanInspector binaryBooleanInspector) {
+ IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
super(projectionList);
this.cond = cond;
- this.binaryBooleanInspector = binaryBooleanInspector;
+ this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
}
@Override
@@ -53,7 +54,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
return new AbstractOneInputOneOutputOneFramePushRuntime() {
private IPointable p = VoidPointable.FACTORY.createPointable();
private IScalarEvaluator eval;
@@ -61,9 +63,9 @@
@Override
public void open() throws HyracksDataException {
if (eval == null) {
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
try {
- eval = cond.createScalarEvaluator();
+ eval = cond.createScalarEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
@@ -82,7 +84,7 @@
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
- if (binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
+ if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
if (projectionList != null) {
appendProjectionToFrame(t, projectionList);
} else {
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index cfc21ad..3f2765f 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -24,9 +24,9 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IPrinter;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -50,7 +50,7 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
final IPrinter[] printers = new IPrinter[printerFactories.length];
for (int i = 0; i < printerFactories.length; i++) {
@@ -125,11 +125,11 @@
public void open() throws HyracksDataException {
if (first) {
first = false;
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
}
try {
- ITupleParser parser = parserFactory.createTupleParser(context.getHyracksContext());
+ ITupleParser parser = parserFactory.createTupleParser(ctx);
process = Runtime.getRuntime().exec(command);
ps = new PrintStream(process.getOutputStream());
ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 8137240..d42a294 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -19,9 +19,9 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -60,7 +60,7 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws AlgebricksException {
return new AbstractOneInputOneOutputOneFramePushRuntime() {
@@ -70,9 +70,9 @@
@Override
public void open() throws HyracksDataException {
- initAccessAppendRef(context);
+ initAccessAppendRef(ctx);
try {
- agg = unnestingFactory.createUnnestingEvaluator();
+ agg = unnestingFactory.createUnnestingEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java
index e4db65d..717fe23 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -34,7 +35,7 @@
private static final long serialVersionUID = 1L;
@Override
- public IUnnestingEvaluator createUnnestingEvaluator() throws AlgebricksException {
+ public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
return new IUnnestingEvaluator() {
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
index c2f3774..10e2ff2 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
@@ -38,13 +39,13 @@
}
@Override
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
return new IScalarEvaluator() {
private IPointable p = VoidPointable.FACTORY.createPointable();
private ArrayBackedValueStorage argOut = new ArrayBackedValueStorage();
- private IScalarEvaluator evalLeft = evalLeftFactory.createScalarEvaluator();
- private IScalarEvaluator evalRight = evalRightFactory.createScalarEvaluator();
+ private IScalarEvaluator evalLeft = evalLeftFactory.createScalarEvaluator(ctx);
+ private IScalarEvaluator evalRight = evalRightFactory.createScalarEvaluator(ctx);
@SuppressWarnings("static-access")
@Override
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
index e99d743..431ad51 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
@@ -38,7 +39,7 @@
}
@Override
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
return new IScalarEvaluator() {
private ArrayBackedValueStorage buf = new ArrayBackedValueStorage();
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
index 14d2976..d3d2938 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.BooleanPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -35,11 +36,11 @@
}
@Override
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
return new IScalarEvaluator() {
private IPointable p = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator eval1 = evalFact1.createScalarEvaluator();
- private IScalarEvaluator eval2 = evalFact2.createScalarEvaluator();
+ private IScalarEvaluator eval1 = evalFact1.createScalarEvaluator(ctx);
+ private IScalarEvaluator eval2 = evalFact2.createScalarEvaluator(ctx);
private byte[] rBytes = new byte[1];
@Override
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
index a50a500..8f37922 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.BooleanPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
@@ -35,11 +36,11 @@
}
@Override
- public IScalarEvaluator createScalarEvaluator() throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
return new IScalarEvaluator() {
private IPointable p = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator eval1 = evalFact1.createScalarEvaluator();
- private IScalarEvaluator eval2 = evalFact2.createScalarEvaluator();
+ private IScalarEvaluator eval1 = evalFact1.createScalarEvaluator(ctx);
+ private IScalarEvaluator eval2 = evalFact2.createScalarEvaluator(ctx);
private byte[] rBytes = new byte[1];
@Override
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 46fe2fe..3de0966 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -182,7 +182,7 @@
IScalarEvaluatorFactory cond = new IntegerGreaterThanEvalFactory(new IntegerConstantEvalFactory(2),
new TupleFieldEvaluatorFactory(0));
StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 0 },
- BinaryBooleanInspectorImpl.INSTANCE);
+ BinaryBooleanInspectorImpl.FACTORY);
RecordDescriptor selectDesc = intScannerDesc;
String filePath = PATH_ACTUAL + SEPARATOR + "scanSelectWrite.out";
@@ -270,7 +270,7 @@
// the algebricks op.
StreamLimitRuntimeFactory limit = new StreamLimitRuntimeFactory(new IntegerConstantEvalFactory(2), null,
- new int[] { 0 }, BinaryIntegerInspectorImpl.INSTANCE);
+ new int[] { 0 }, BinaryIntegerInspectorImpl.FACTORY);
RecordDescriptor limitDesc = new RecordDescriptor(
new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
@@ -435,7 +435,7 @@
IScalarEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
new TupleFieldEvaluatorFactory(0)); // Canadian customers
StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
- BinaryBooleanInspectorImpl.INSTANCE);
+ BinaryBooleanInspectorImpl.FACTORY);
RecordDescriptor selectDesc = new RecordDescriptor(
new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
@@ -505,7 +505,7 @@
IScalarEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
new TupleFieldEvaluatorFactory(0)); // Canadian customers
StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
- BinaryBooleanInspectorImpl.INSTANCE);
+ BinaryBooleanInspectorImpl.FACTORY);
RecordDescriptor selectDesc = new RecordDescriptor(
new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
@@ -838,7 +838,7 @@
IScalarEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
new TupleFieldEvaluatorFactory(0)); // Canadian customers
StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
- BinaryBooleanInspectorImpl.INSTANCE);
+ BinaryBooleanInspectorImpl.FACTORY);
RecordDescriptor selectDesc = new RecordDescriptor(
new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index e3c71ea..2999648 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -26,4 +26,6 @@
public JobId getJobId();
public ICounterContext getCounterContext();
+
+ public Object getGlobalJobData();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
index 26cb525..297b56c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -16,7 +16,8 @@
import java.io.Serializable;
-public interface ITuplePairComparatorFactory extends Serializable {
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
- public ITuplePairComparator createTuplePairComparator();
-}
+public interface ITuplePairComparatorFactory extends Serializable {
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java
new file mode 100644
index 0000000..bee495a
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IGlobalJobDataFactory extends Serializable {
+ public Object createGlobalJobData(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 495f6c3..84ebedb 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -60,6 +60,8 @@
private IJobletEventListenerFactory jobletEventListenerFactory;
+ private IGlobalJobDataFactory globalJobDataFactory;
+
private transient int operatorIdCounter;
private transient int connectorIdCounter;
@@ -232,6 +234,14 @@
this.jobletEventListenerFactory = jobletEventListenerFactory;
}
+ public IGlobalJobDataFactory getGlobalJobDataFactory() {
+ return globalJobDataFactory;
+ }
+
+ public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+ this.globalJobDataFactory = globalJobDataFactory;
+ }
+
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 792b879..ec7cf89 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -32,7 +32,9 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IGlobalJobDataFactory;
import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -74,7 +76,9 @@
private final IWorkspaceFileFactory fileFactory;
- private IJobletEventListener jobletEventListener;
+ private final Object globalJobData;
+
+ private final IJobletEventListener jobletEventListener;
private JobStatus cleanupStatus;
@@ -93,6 +97,16 @@
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
cleanupPending = false;
+ IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
+ if (jelf != null) {
+ IJobletEventListener listener = jelf.createListener(this);
+ this.jobletEventListener = listener;
+ listener.jobletStart();
+ } else {
+ jobletEventListener = null;
+ }
+ IGlobalJobDataFactory gjdf = jag.getJobSpecification().getGlobalJobDataFactory();
+ globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
}
@Override
@@ -221,6 +235,11 @@
return counter;
}
+ @Override
+ public Object getGlobalJobData() {
+ return globalJobData;
+ }
+
public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
IPartitionCollector collector, PartitionState minState) throws Exception {
for (PartitionId pid : pids) {
@@ -241,10 +260,6 @@
return jobletEventListener;
}
- public void setJobletEventListener(IJobletEventListener jobletEventListener) {
- this.jobletEventListener = jobletEventListener;
- }
-
public void cleanup(JobStatus status) {
cleanupStatus = status;
cleanupPending = true;
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index a101612..368539c 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -37,8 +37,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IJobletEventListener;
-import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -168,12 +166,6 @@
throw new NullPointerException("JobActivityGraph was null");
}
ji = new Joblet(ncs, jobId, appCtx, jag);
- IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
- if (jelf != null) {
- IJobletEventListener listener = jelf.createListener(ji);
- ji.setJobletEventListener(listener);
- listener.jobletStart();
- }
jobletMap.put(jobId, ji);
}
return ji;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/JoinComparatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/JoinComparatorFactory.java
index 6e120f0..377e9e0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/JoinComparatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/JoinComparatorFactory.java
@@ -1,10 +1,10 @@
package edu.uci.ics.hyracks.dataflow.std.join;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-
public class JoinComparatorFactory implements ITuplePairComparatorFactory {
private static final long serialVersionUID = 1L;
@@ -19,7 +19,7 @@
}
@Override
- public ITuplePairComparator createTuplePairComparator() {
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 1dfe6ca..8d4f57a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -48,8 +48,8 @@
private final ITuplePairComparatorFactory comparatorFactory;
private final int memSize;
- public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, ITuplePairComparatorFactory comparatorFactory,
- RecordDescriptor recordDescriptor, int memSize) {
+ public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize) {
super(spec, 2, 1);
this.comparatorFactory = comparatorFactory;
this.recordDescriptors[0] = recordDescriptor;
@@ -105,7 +105,7 @@
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
- final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+ final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private JoinCacheTaskState state;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index f1e39a2..da02f3d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -63,32 +63,32 @@
* of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe, where in our implementation Probe phase
* can apply HHJ recursively, based on the value of M and size of R and S. HHJ phases proceed as follow:
* BUILD:
- * – Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion].
- * – Initialize the build phase (one frame per partition – all partitions considered resident at first)
- * – Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find
+ * Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion].
+ * Initialize the build phase (one frame per partition, all partitions considered resident at first)
+ * Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find
* its target partition and try to append it to that partition:
- * – If target partition's buffer is full, try to allocate a new buffer for it.
- * – if no free buffer is available, find the largest resident partition and spill it. Using its freed
+ * If target partition's buffer is full, try to allocate a new buffer for it.
+ * if no free buffer is available, find the largest resident partition and spill it. Using its freed
* buffers after spilling, allocate a new buffer for the target partition.
- * – Being done with R, close the build phase. (During closing we write the very last buffer of each
+ * Being done with R, close the build phase. (During closing we write the very last buffer of each
* spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers, belonging to
* spilled partitions as possible into memory, based on the free buffers - We will stop at the point where remaining free buffers is not enough
* for reloading an entire partition back into memory)
- * – Create the hash table for the resident partitions (basically we create an in-memory hash join here)
+ * Create the hash table for the resident partitions (basically we create an in-memory hash join here)
* PROBE:
- * – Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer
+ * Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer
* for the whole resident partitions)
- * – Read tuples of S, frame by frame and hash each tuple T to its target partition P
- * – if P is a resident partition, pass T to the in-memory hash join and generate the output record,
+ * Read tuples of S, frame by frame and hash each tuple T to its target partition P
+ * if P is a resident partition, pass T to the in-memory hash join and generate the output record,
* if any matching(s) record found
- * – if P is spilled, write T to the dedicated buffer for P (on the probe side)
- * – Once scanning of S is done, we try to join partition pairs (Ri, Si) of the spilled partitions:
- * – if any of Ri or Si is smaller than M, then we simply use an in-memory hash join to join them
- * – otherwise we apply HHJ recursively:
- * – if after applying HHJ recursively, we do not gain enough size reduction (max size of the
+ * if P is spilled, write T to the dedicated buffer for P (on the probe side)
+ * Once scanning of S is done, we try to join partition pairs (Ri, Si) of the spilled partitions:
+ * if any of Ri or Si is smaller than M, then we simply use an in-memory hash join to join them
+ * otherwise we apply HHJ recursively:
+ * if after applying HHJ recursively, we do not gain enough size reduction (max size of the
* resulting partitions were more than 80% of the initial Ri,Si size) then we switch to
* nested loop join for joining.
- * – (At each step of partition-pair joining, we consider role reversal, which means if size of Si were
+ * (At each step of partition-pair joining, we consider role reversal, which means if size of Si were
* greater than Ri, then we make sure that we switch the roles of build/probe between them)
*/
@@ -116,8 +116,8 @@
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
- public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, double factor,
- int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
+ public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory0,
ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter,
@@ -139,8 +139,8 @@
}
- public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, double factor,
- int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
+ public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
throws HyracksDataException {
@@ -325,8 +325,8 @@
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator();
- final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator();
+ final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
+ final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 6411390..1e60372 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -60,7 +61,7 @@
}
@Override
- public ITuplePairComparator createTuplePairComparator() {
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
}
}
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index c9a6254..eda17c8 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -315,7 +316,7 @@
}
@Override
- public ITuplePairComparator createTuplePairComparator() {
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
}
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITupleFilterFactory.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITupleFilterFactory.java
index 4633fc9..602902d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITupleFilterFactory.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITupleFilterFactory.java
@@ -17,6 +17,8 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
public interface ITupleFilterFactory extends Serializable {
- public ITupleFilter createTupleFilter() throws Exception;
+ public ITupleFilter createTupleFilter(IHyracksTaskContext ctx) throws Exception;
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 3933de6..b839c72 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -67,7 +67,7 @@
treeIndexHelper.getSearchOperationCallback());
ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
- tupleFilter = tupleFilterFactory.createTupleFilter();
+ tupleFilter = tupleFilterFactory.createTupleFilter(treeIndexHelper.ctx);
frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
@@ -139,4 +139,4 @@
public void fail() throws HyracksDataException {
writer.fail();
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index 4f84466..7231f67 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -88,4 +88,9 @@
public JobId getJobId() {
return jobId;
}
+
+ @Override
+ public Object getGlobalJobData() {
+ return null;
+ }
}
\ No newline at end of file