[NO ISSUE][COMP] Support for running queries during optimization
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Provide support for running helper queries during optimization
- Introduce a new logical ruleset for running sampling queries
Change-Id: I457063fef269ae00947169d663d828488c67c2ee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16143
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index 7a32f42..f542af3 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -202,6 +202,10 @@
<artifactId>hyracks-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java
index ba42036..76404ff 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java
@@ -23,23 +23,33 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.optimizer.base.RuleCollections;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFirstRuleCheckFixpointRuleController;
import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
public class DefaultRuleSetFactory implements IRuleSetFactory {
@Override
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(
- ICcApplicationContext appCtx) throws AlgebricksException {
+ ICcApplicationContext appCtx) {
return buildLogical(appCtx);
}
@Override
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(IRuleSetKind ruleSetKind,
+ ICcApplicationContext appCtx) {
+ if (ruleSetKind == RuleSetKind.SAMPLING) {
+ return buildLogicalSampling();
+ } else {
+ throw new IllegalArgumentException(String.valueOf(ruleSetKind));
+ }
+ }
+
+ @Override
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites(
ICcApplicationContext appCtx) {
return buildPhysical(appCtx);
@@ -76,6 +86,14 @@
return defaultLogicalRewrites;
}
+ public static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildLogicalSampling() {
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites = new ArrayList<>();
+ SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
+ logicalRewrites.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildConsolidationRuleCollection()));
+ logicalRewrites.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildPlanCleanupRuleCollection()));
+ return logicalRewrites;
+ }
+
public static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildPhysical(
ICcApplicationContext appCtx) {
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> defaultPhysicalRewrites = new ArrayList<>();
@@ -88,5 +106,4 @@
defaultPhysicalRewrites.add(new Pair<>(seqOnceCtrl, RuleCollections.prepareForJobGenRuleCollection()));
return defaultPhysicalRewrites;
}
-
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java
index 2300e4a..c643d21 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java
@@ -21,24 +21,30 @@
import java.util.List;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
public interface IRuleSetFactory {
+ enum RuleSetKind implements IRuleSetKind {
+ SAMPLING
+ }
+
/**
* @return the logical rewrites
- * @throws AlgebricksException
*/
- public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(
- ICcApplicationContext appCtx) throws AlgebricksException;
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(ICcApplicationContext appCtx);
+
+ /**
+ * @return the logical rewrites of the specified kind
+ */
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(IRuleSetKind ruleSetKind,
+ ICcApplicationContext appCtx);
/**
* @return the physical rewrites
*/
- public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites(
- ICcApplicationContext appCtx) throws AlgebricksException;
-
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites(ICcApplicationContext appCtx);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 8b5c8bc..b6f79d9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -18,32 +18,70 @@
*/
package org.apache.asterix.optimizer.base;
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.declared.ResultSetDataSink;
+import org.apache.asterix.metadata.declared.ResultSetSinkId;
+import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.optimizer.rules.am.AccessMethodUtils;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.translator.ResultMetadata;
+import org.apache.asterix.translator.SessionConfig;
import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.compiler.api.ICompiler;
+import org.apache.hyracks.algebricks.compiler.api.ICompilerFactory;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class AnalysisUtil {
+
+ private static final List<FunctionIdentifier> fieldAccessFunctions =
+ Arrays.asList(BuiltinFunctions.GET_DATA, BuiltinFunctions.GET_HANDLE, BuiltinFunctions.TYPE_OF);
+
/*
* If the first child of op is of type opType, then it returns that child,
* o/w returns null.
@@ -204,11 +242,86 @@
return true;
}
- private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>();
+ public static List<List<IAObject>> runQuery(Mutable<ILogicalOperator> topOp, List<LogicalVariable> resultVars,
+ IOptimizationContext queryOptCtx, IRuleSetKind ruleSetKind) throws AlgebricksException {
- static {
- fieldAccessFunctions.add(BuiltinFunctions.GET_DATA);
- fieldAccessFunctions.add(BuiltinFunctions.GET_HANDLE);
- fieldAccessFunctions.add(BuiltinFunctions.TYPE_OF);
+ MetadataProvider metadataProvider = (MetadataProvider) queryOptCtx.getMetadataProvider();
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+ TxnId mainTxnId = metadataProvider.getTxnId();
+ try {
+ TxnId newTxnId = metadataProvider.getTxnIdFactory().create();
+ metadataProvider.setTxnId(newTxnId);
+
+ IVariableTypeEnvironment topOpTypeEnv = queryOptCtx.getOutputTypeEnvironment(topOp.getValue());
+ SerializerDeserializerProvider serdeProvider = SerializerDeserializerProvider.INSTANCE;
+
+ int nFields = resultVars.size();
+ List<Mutable<ILogicalExpression>> resultExprList = new ArrayList<>(nFields);
+ List<ISerializerDeserializer<?>> resultSerdeList = new ArrayList<>(nFields);
+
+ for (LogicalVariable var : resultVars) {
+ Object varType = topOpTypeEnv.getVarType(var);
+ if (varType == null) {
+ throw new IllegalArgumentException("Cannot determine type of " + var);
+ }
+ resultSerdeList.add(serdeProvider.getSerializerDeserializer(varType));
+ resultExprList.add(new MutableObject<>(new VariableReferenceExpression(var)));
+ }
+
+ ResultMetadata resultMetadata = new ResultMetadata(SessionConfig.OutputFormat.ADM);
+ ResultSetId resultSetId = new ResultSetId(metadataProvider.getResultSetIdCounter().getAndInc());
+ ResultSetSinkId rssId = new ResultSetSinkId(resultSetId);
+ ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
+
+ DistributeResultOperator resultOp = new DistributeResultOperator(resultExprList, sink, resultMetadata);
+ resultOp.getInputs().add(topOp);
+ queryOptCtx.computeAndSetTypeEnvironmentForOperator(resultOp);
+
+ MutableObject<ILogicalOperator> newResultOpRef = new MutableObject<>(resultOp);
+
+ ICompilerFactory compilerFactory = (ICompilerFactory) queryOptCtx.getCompilerFactory();
+ ICompiler compiler =
+ compilerFactory.createCompiler(new ALogicalPlanImpl(newResultOpRef), queryOptCtx, ruleSetKind);
+ compiler.optimize();
+
+ JobSpecification jobSpec = compiler.createJob(appCtx, new JobEventListenerFactory(newTxnId, false));
+
+ JobId jobId = JobUtils.runJob(appCtx.getHcc(), jobSpec, true);
+
+ IResultSetReader resultSetReader = appCtx.getResultSet().createReader(jobId, resultSetId);
+ FrameManager frameManager = new FrameManager(queryOptCtx.getPhysicalOptimizationConfig().getFrameSize());
+ IFrame frame = new VSizeFrame(frameManager);
+
+ FrameTupleAccessor fta = new FrameTupleAccessor(null);
+ ByteArrayAccessibleInputStream bais = new ByteArrayAccessibleInputStream(frame.getBuffer().array(), 0, 0);
+ DataInputStream dis = new DataInputStream(bais);
+ List<List<IAObject>> result = new ArrayList<>();
+
+ while (resultSetReader.read(frame) > 0) {
+ ByteBuffer buffer = frame.getBuffer();
+ fta.reset(buffer);
+ int nTuples = fta.getTupleCount();
+ for (int tupleIdx = 0; tupleIdx < nTuples; tupleIdx++) {
+ int tupleStart = fta.getTupleStartOffset(tupleIdx);
+ int tupleEnd = fta.getTupleEndOffset(tupleIdx);
+ bais.setContent(buffer.array(), tupleStart, tupleEnd - tupleStart);
+
+ List<IAObject> values = new ArrayList<>(nFields);
+ for (int fieldIdx = 0; fieldIdx < nFields; fieldIdx++) {
+ IAObject value = (IAObject) resultSerdeList.get(fieldIdx).deserialize(dis);
+ values.add(value);
+ }
+ result.add(values);
+ }
+ }
+
+ return result;
+ } catch (AlgebricksException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ } finally {
+ metadataProvider.setTxnId(mainTxnId);
+ }
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java
index 51c03eb..7b2dd7a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.IMissableTypeComputer;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -39,17 +40,24 @@
public final class AsterixOptimizationContext extends AlgebricksOptimizationContext {
- private final Int2ObjectMap<Set<DataSource>> dataSourceMap = new Int2ObjectOpenHashMap<>();
+ private final Int2ObjectOpenHashMap<Set<DataSource>> dataSourceMap;
- public AsterixOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ public AsterixOptimizationContext(IOptimizationContextFactory optContextFactory, int varCounter,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer,
IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations, IPlanPrettyPrinter prettyPrinter,
IWarningCollector warningCollector) {
- super(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
- nullableTypeComputer, conflictingTypeResovler, physicalOptimizationConfig, clusterLocations,
- prettyPrinter, warningCollector);
+ super(optContextFactory, varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory,
+ expressionTypeComputer, nullableTypeComputer, conflictingTypeResovler, physicalOptimizationConfig,
+ clusterLocations, prettyPrinter, warningCollector);
+ dataSourceMap = new Int2ObjectOpenHashMap<>();
+ }
+
+ public AsterixOptimizationContext(AsterixOptimizationContext from) {
+ super(from);
+ dataSourceMap = from.dataSourceMap.clone();
}
public void addDataSource(DataSource dataSource) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 5408915..7202f74 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -29,7 +29,6 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.NoOpWarningCollector;
import org.apache.asterix.common.exceptions.WarningCollector;
import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
@@ -89,11 +88,14 @@
import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
@@ -150,8 +152,9 @@
jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE,
BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE,
BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
- BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE,
- NullWriterFactory.INSTANCE, UnnestingPositionWriterFactory.INSTANCE, null,
+ BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
+ ResultSerializerFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
+ UnnestingPositionWriterFactory.INSTANCE, null,
new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null,
NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java
similarity index 96%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java
index 94360a1..78f84ff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.api.common;
+package org.apache.asterix.translator;
import java.util.List;
import java.util.Set;
-import org.apache.asterix.translator.SessionConfig;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.result.IResultMetadata;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 8716f78..9b9dfac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -40,6 +40,7 @@
import org.apache.asterix.common.api.IResponsePrinter;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -71,6 +72,7 @@
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.ExecutionPlans;
import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.ResultMetadata;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.utils.ResourceUtils;
@@ -95,6 +97,8 @@
import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -152,11 +156,16 @@
IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector) {
IPlanPrettyPrinter prettyPrinter = PlanPrettyPrinter.createStringPlanPrettyPrinter();
- return new AsterixOptimizationContext(varCounter, expressionEvalSizeComputer,
+ return new AsterixOptimizationContext(this, varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrinter,
warningCollector);
}
+
+ @Override
+ public IOptimizationContext cloneOptimizationContext(IOptimizationContext oc) {
+ return new AsterixOptimizationContext((AsterixOptimizationContext) oc);
+ }
}
public Pair<IReturningStatement, Integer> reWriteQuery(LangRewritingContext langRewritingContext,
@@ -205,7 +214,8 @@
&& conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
generateLogicalPlan(plan, output.config().getPlanFormat());
}
- CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
+ ICcApplicationContext ccAppContext = metadataProvider.getApplicationContext();
+ CompilerProperties compilerProperties = ccAppContext.getCompilerProperties();
Map<String, Object> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc);
final PhysicalOptimizationConfig physOptConf =
OptimizationConfUtil.createPhysicalOptimizationConf(compilerProperties, querySpecificConfig, sourceLoc);
@@ -213,8 +223,9 @@
HeuristicCompilerFactoryBuilder builder =
new HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE);
builder.setPhysicalOptimizationConfig(physOptConf);
- builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites(metadataProvider.getApplicationContext()));
- builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites(metadataProvider.getApplicationContext()));
+ builder.setLogicalRewrites(() -> ruleSetFactory.getLogicalRewrites(ccAppContext));
+ builder.setLogicalRewritesByKind(kind -> ruleSetFactory.getLogicalRewrites(kind, ccAppContext));
+ builder.setPhysicalRewrites(() -> ruleSetFactory.getPhysicalRewrites(ccAppContext));
IDataFormat format = metadataProvider.getDataFormat();
ICompilerFactory compilerFactory = builder.create();
builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer());
@@ -232,6 +243,24 @@
chooseLocations(clusterInfoCollector, parallelism, metadataProvider.getClusterLocations());
builder.setClusterLocations(computationLocations);
+ builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory());
+ builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
+ builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider());
+ builder.setExpressionRuntimeProvider(
+ new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())));
+ builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider());
+ builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
+ builder.setMissingWriterFactory(format.getMissingWriterFactory());
+ builder.setNullWriterFactory(format.getNullWriterFactory());
+ builder.setUnnestingPositionWriterFactory(format.getUnnestingPositionWriterFactory());
+ builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());
+ builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt()));
+ builder.setWriterFactory(PrinterBasedWriterFactory.INSTANCE);
+ builder.setResultSerializerFactoryProvider(ResultSerializerFactoryProvider.INSTANCE);
+ builder.setSerializerDeserializerProvider(format.getSerdeProvider());
+ builder.setTypeTraitProvider(format.getTypeTraitProvider());
+ builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
+
ICompiler compiler = compilerFactory.createCompiler(plan, metadataProvider, t.getVarCounter());
if (conf.isOptimize()) {
compiler.optimize();
@@ -280,32 +309,16 @@
return null;
}
- builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory());
- builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
- builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider());
- builder.setExpressionRuntimeProvider(
- new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())));
- builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider());
- builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
- builder.setMissingWriterFactory(format.getMissingWriterFactory());
- builder.setNullWriterFactory(format.getNullWriterFactory());
- builder.setUnnestingPositionWriterFactory(format.getUnnestingPositionWriterFactory());
- builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());
- builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt()));
- builder.setSerializerDeserializerProvider(format.getSerdeProvider());
- builder.setTypeTraitProvider(format.getTypeTraitProvider());
- builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
-
JobEventListenerFactory jobEventListenerFactory =
new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
- JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory);
+ JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory);
if (isQuery) {
if (requestParameters == null || !requestParameters.isSkipAdmissionPolicy()) {
// Sets a required capacity, only for read-only queries.
// DDLs and DMLs are considered not that frequent.
// limit the computation locations to the locations that will be used in the query
- final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
+ final INodeJobTracker nodeJobTracker = ccAppContext.getNodeJobTracker();
final AlgebricksAbsolutePartitionConstraint jobLocations =
getJobLocations(spec, nodeJobTracker, computationLocations);
final IClusterCapacity jobRequiredCapacity =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 981cdc9..bfebfd6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
-import org.apache.asterix.api.common.ResultMetadata;
import org.apache.asterix.app.result.ResponseMetrics;
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.app.result.ResultHandle;
@@ -31,6 +30,7 @@
import org.apache.asterix.app.result.fields.ResultsPrinter;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.ResultMetadata;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.exceptions.ErrorCode;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index 05073f9..201a470 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -23,8 +23,8 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.asterix.api.common.ResultMetadata;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.translator.ResultMetadata;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java
index 048584c..3f1e632 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java
@@ -22,7 +22,6 @@
import java.util.LinkedHashSet;
import java.util.List;
-import org.apache.asterix.api.common.ResultMetadata;
import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.common.annotations.IRecordTypeAnnotation;
import org.apache.asterix.common.annotations.RecordFieldOrderAnnotation;
@@ -34,6 +33,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.translator.ExecutionPlans;
+import org.apache.asterix.translator.ResultMetadata;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.util.JSONUtil;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8a010ca..de3492c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -236,13 +236,11 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.Counter;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -331,10 +329,8 @@
public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
validateStatements(requestParameters);
trackRequest(requestParameters);
- int resultSetIdCounter = 0;
+ Counter resultSetIdCounter = new Counter(0);
FileSplit outputFile = null;
- IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
- IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
String threadName = Thread.currentThread().getName();
Thread.currentThread().setName(
QueryTranslator.class.getSimpleName() + ":" + requestParameters.getRequestReference().getUuid());
@@ -354,8 +350,8 @@
}
validateOperation(appCtx, activeDataverse, stmt);
MetadataProvider metadataProvider = MetadataProvider.create(appCtx, activeDataverse);
- configureMetadataProvider(metadataProvider, config, resultSerializerFactoryProvider, writerFactory,
- outputFile, requestParameters, stmt);
+ configureMetadataProvider(metadataProvider, config, resultSetIdCounter, outputFile, requestParameters,
+ stmt);
IStatementRewriter stmtRewriter = rewriterFactory.createStatementRewriter();
rewriteStatement(stmt, stmtRewriter, metadataProvider); // Rewrite the statement's AST.
Statement.Kind kind = stmt.getKind();
@@ -445,7 +441,7 @@
case INSERT:
case UPSERT:
if (((InsertStatement) stmt).getReturnExpression() != null) {
- metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+ metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter.getAndInc()));
metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
|| resultDelivery == ResultDelivery.DEFERRED);
metadataProvider.setMaxResultReads(maxResultReads);
@@ -481,7 +477,7 @@
handleCreateFeedPolicyStatement(metadataProvider, stmt);
break;
case QUERY:
- metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+ metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter.getAndInc()));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
metadataProvider.setMaxResultReads(maxResultReads);
@@ -507,7 +503,8 @@
final ExtensionStatement extStmt = (ExtensionStatement) stmt;
statementProperties.setName(extStmt.getName());
if (!isCompileOnly()) {
- extStmt.handle(hcc, this, requestParameters, metadataProvider, resultSetIdCounter);
+ extStmt.handle(hcc, this, requestParameters, metadataProvider,
+ resultSetIdCounter.getAndInc());
}
break;
default:
@@ -525,14 +522,13 @@
}
protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config,
- IResultSerializerFactoryProvider resultSerializerFactoryProvider, IAWriterFactory writerFactory,
- FileSplit outputFile, IRequestParameters requestParameters, Statement statement) {
+ Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters,
+ Statement statement) {
if (statement.getKind() == Statement.Kind.QUERY && requestParameters.isSQLCompatMode()) {
metadataProvider.getConfig().put(SqlppQueryRewriter.SQL_COMPAT_OPTION, Boolean.TRUE.toString());
}
metadataProvider.getConfig().putAll(config);
- metadataProvider.setWriterFactory(writerFactory);
- metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
+ metadataProvider.setResultSetIdCounter(resultSetIdCounter);
metadataProvider.setOutputFile(outputFile);
}
@@ -4369,8 +4365,8 @@
private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
final ClusterControllerService controllerService =
(ClusterControllerService) appCtx.getServiceContext().getControllerService();
- org.apache.asterix.api.common.ResultMetadata resultMetadata =
- (org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService()
+ org.apache.asterix.translator.ResultMetadata resultMetadata =
+ (org.apache.asterix.translator.ResultMetadata) controllerService.getResultDirectoryService()
.getResultMetadata(jobId, rsId);
stats.setProcessedObjects(resultMetadata.getProcessedObjects());
if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java
index a036b7e..6ac805b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java
@@ -20,27 +20,10 @@
package org.apache.asterix.common.exceptions;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-public final class NoOpWarningCollector implements IWarningCollector {
-
- public static final IWarningCollector INSTANCE = new NoOpWarningCollector();
+public final class NoOpWarningCollector {
+ public static final IWarningCollector INSTANCE = org.apache.hyracks.api.exceptions.NoOpWarningCollector.INSTANCE;
private NoOpWarningCollector() {
}
-
- @Override
- public void warn(Warning warning) {
- // no-op
- }
-
- @Override
- public boolean shouldWarn() {
- return false;
- }
-
- @Override
- public long getTotalWarningsCount() {
- return 0;
- }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1cdbbea..59e0986 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -115,6 +115,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.Counter;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
@@ -183,12 +184,11 @@
private Dataverse defaultDataverse;
private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
- private IAWriterFactory writerFactory;
private FileSplit outputFile;
private boolean asyncResults;
private long maxResultReads;
private ResultSetId resultSetId;
- private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
+ private Counter resultSetIdCounter;
private TxnId txnId;
private Map<String, Integer> externalDataLocks;
private boolean blockingOperatorDisabled = false;
@@ -263,10 +263,6 @@
this.isWriteTransaction = writeTransaction;
}
- public void setWriterFactory(IAWriterFactory writerFactory) {
- this.writerFactory = writerFactory;
- }
-
public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
this.mdTxnCtx = mdTxnCtx;
}
@@ -275,10 +271,6 @@
return mdTxnCtx;
}
- public IAWriterFactory getWriterFactory() {
- return this.writerFactory;
- }
-
public FileSplit getOutputFile() {
return outputFile;
}
@@ -311,12 +303,12 @@
this.resultSetId = resultSetId;
}
- public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) {
- this.resultSerializerFactoryProvider = rafp;
+ public Counter getResultSetIdCounter() {
+ return resultSetIdCounter;
}
- public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
- return resultSerializerFactoryProvider;
+ public void setResultSetIdCounter(Counter resultSetIdCounter) {
+ this.resultSetIdCounter = resultSetIdCounter;
}
public boolean isWriteTransaction() {
@@ -686,7 +678,8 @@
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
+ int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
+ RecordDescriptor inputDesc) {
FileSplitDataSink fsds = (FileSplitDataSink) sink;
FileSplitSinkId fssi = fsds.getId();
FileSplit fs = fssi.getFileSplit();
@@ -694,14 +687,15 @@
String nodeId = fs.getNodeName();
SinkWriterRuntimeFactory runtime =
- new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, getWriterFactory(), inputDesc);
+ new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, writerFactory, inputDesc);
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
return new Pair<>(runtime, apc);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc,
+ int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
+ IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
IResultMetadata metadata, JobSpecification spec) throws AlgebricksException {
ResultSetDataSink rsds = (ResultSetDataSink) sink;
ResultSetSinkId rssId = rsds.getId();
@@ -709,7 +703,7 @@
ResultWriterOperatorDescriptor resultWriter = null;
try {
IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
- .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory());
+ .getResultSerializerFactoryProvider(printColumns, printerFactories, writerFactory);
resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, metadata, getResultAsyncMode(),
resultSerializedAppenderFactory, getMaxResultReads());
} catch (IOException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 92390a7..0ee9516 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -30,7 +30,6 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.MetadataException;
-import org.apache.asterix.common.exceptions.NoOpWarningCollector;
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
@@ -62,6 +61,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
/**
* A utility class for providing helper functions for feeds TODO: Refactor this
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index b899e16..c6eafe0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -30,7 +30,6 @@
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.TransactionState;
import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.exceptions.NoOpWarningCollector;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
@@ -64,6 +63,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
index fbe9dbc..f557d7a 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
@@ -59,6 +59,11 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-runtime</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index c22d54d..3a2fe3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.algebricks.compiler.api;
import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -32,7 +34,9 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
@@ -40,6 +44,7 @@
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
@@ -49,8 +54,9 @@
public abstract class AbstractCompilerFactoryBuilder {
- protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
- protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+ protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites;
+ protected Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind;
+ protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites;
protected ITypeTraitProvider typeTraitProvider;
protected ISerializerDeserializerProvider serializerDeserializerProvider;
protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
@@ -59,6 +65,8 @@
protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
protected IPrinterFactoryProvider printerProvider;
+ protected IAWriterFactory writerFactory;
+ protected IResultSerializerFactoryProvider resultSerializerFactoryProvider;
protected IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
protected IExpressionRuntimeProvider expressionRuntimeProvider;
protected IExpressionTypeComputer expressionTypeComputer;
@@ -78,11 +86,18 @@
public abstract ICompilerFactory create();
- public void setLogicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites) {
+ public void setLogicalRewrites(
+ Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites) {
this.logicalRewrites = logicalRewrites;
}
- public void setPhysicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites) {
+ public void setLogicalRewritesByKind(
+ Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind) {
+ this.logicalRewritesByKind = logicalRewritesByKind;
+ }
+
+ public void setPhysicalRewrites(
+ Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites) {
this.physicalRewrites = physicalRewrites;
}
@@ -158,6 +173,22 @@
return printerProvider;
}
+ public void setWriterFactory(IAWriterFactory writerFactory) {
+ this.writerFactory = writerFactory;
+ }
+
+ public IAWriterFactory getWriterFactory() {
+ return writerFactory;
+ }
+
+ public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider resultSerializerFactoryProvider) {
+ this.resultSerializerFactoryProvider = resultSerializerFactoryProvider;
+ }
+
+ public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
+ return resultSerializerFactoryProvider;
+ }
+
public void setExpressionRuntimeProvider(IExpressionRuntimeProvider expressionRuntimeProvider) {
this.expressionRuntimeProvider = expressionRuntimeProvider;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 891980f..e35a539 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,8 +18,11 @@
*/
package org.apache.hyracks.algebricks.compiler.api;
+import java.util.List;
+
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
@@ -33,10 +36,15 @@
import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
+import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
+import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.JobSpecification;
@@ -58,14 +66,19 @@
IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector) {
IPlanPrettyPrinter prettyPrinter = PlanPrettyPrinter.createStringPlanPrettyPrinter();
- return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
+ return new AlgebricksOptimizationContext(this, varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrinter,
warningCollector);
}
+
+ @Override
+ public IOptimizationContext cloneOptimizationContext(IOptimizationContext oc) {
+ return new AlgebricksOptimizationContext((AlgebricksOptimizationContext) oc);
+ }
}
- private IOptimizationContextFactory optCtxFactory;
+ private final IOptimizationContextFactory optCtxFactory;
public HeuristicCompilerFactoryBuilder() {
this.optCtxFactory = DefaultOptimizationContextFactory.INSTANCE;
@@ -77,42 +90,85 @@
@Override
public ICompilerFactory create() {
- return new ICompilerFactory() {
- @Override
- public ICompiler createCompiler(final ILogicalPlan plan, final IMetadataProvider<?, ?> metadata,
- int varCounter) {
- final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
- expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
- missableTypeComputer, conflictingTypeResolver, physicalOptimizationConfig, clusterLocations,
- warningCollector);
- oc.setMetadataDeclarations(metadata);
- final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
- return new ICompiler() {
-
- @Override
- public void optimize() throws AlgebricksException {
- opt.optimize();
- }
-
- @Override
- public JobSpecification createJob(Object appContext,
- IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
- AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
- JobGenContext context = new JobGenContext(null, metadata, appContext,
- serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
- comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
- binaryIntegerInspectorFactory, printerProvider, missingWriterFactory, nullWriterFactory,
- unnestingPositionWriterFactory, normalizedKeyComputerFactoryProvider,
- expressionRuntimeProvider, expressionTypeComputer, oc, expressionEvalSizeComputer,
- partialAggregationTypeComputer, predEvaluatorFactoryProvider,
- physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector,
- maxWarnings, physicalOptimizationConfig);
- PlanCompiler pc = new PlanCompiler(context);
- return pc.compilePlan(plan, jobEventListenerFactory);
- }
- };
- }
- };
+ return new CompilerFactoryImpl();
}
+ private class CompilerFactoryImpl implements ICompilerFactory {
+ @Override
+ public ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter) {
+ IOptimizationContext optContext =
+ optCtxFactory.createOptimizationContext(varCounter, expressionEvalSizeComputer,
+ mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
+ conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, warningCollector);
+ optContext.setMetadataDeclarations(metadata);
+ optContext.setCompilerFactory(this);
+ return new CompilerImpl(this, plan, optContext, logicalRewrites.get(), physicalRewrites.get(),
+ writerFactory);
+ }
+
+ @Override
+ public ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext,
+ IRuleSetKind ruleSetKind) {
+ if (newOptContext.getCompilerFactory() != this) {
+ throw new IllegalStateException();
+ }
+ return new CompilerImpl(this, plan, newOptContext, logicalRewritesByKind.apply(ruleSetKind),
+ physicalRewrites.get(), SerializedDataWriterFactory.WITHOUT_RECORD_DESCRIPTOR);
+ }
+
+ private PlanCompiler createPlanCompiler(IOptimizationContext oc, Object appContext,
+ IAWriterFactory writerFactory) {
+ JobGenContext context = new JobGenContext(null, oc.getMetadataProvider(), appContext,
+ serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
+ comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
+ binaryIntegerInspectorFactory, printerProvider, writerFactory, resultSerializerFactoryProvider,
+ missingWriterFactory, nullWriterFactory, unnestingPositionWriterFactory,
+ normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer, oc,
+ expressionEvalSizeComputer, partialAggregationTypeComputer, predEvaluatorFactoryProvider,
+ physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector, maxWarnings,
+ physicalOptimizationConfig);
+ return new PlanCompiler(context);
+ }
+ }
+
+ private static class CompilerImpl implements ICompiler {
+
+ private final CompilerFactoryImpl factory;
+
+ private final ILogicalPlan plan;
+
+ private final IOptimizationContext oc;
+
+ private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
+
+ private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+
+ private final IAWriterFactory writerFactory;
+
+ private CompilerImpl(CompilerFactoryImpl factory, ILogicalPlan plan, IOptimizationContext oc,
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites,
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites,
+ IAWriterFactory writerFactory) {
+ this.factory = factory;
+ this.plan = plan;
+ this.oc = oc;
+ this.logicalRewrites = logicalRewrites;
+ this.physicalRewrites = physicalRewrites;
+ this.writerFactory = writerFactory;
+ }
+
+ @Override
+ public void optimize() throws AlgebricksException {
+ HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
+ opt.optimize();
+ }
+
+ @Override
+ public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
+ throws AlgebricksException {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
+ PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory);
+ return pc.compilePlan(plan, jobEventListenerFactory);
+ }
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
index 7c138ea..07a8034 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
@@ -19,8 +19,12 @@
package org.apache.hyracks.algebricks.compiler.api;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
public interface ICompilerFactory {
ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter);
+
+ ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext, IRuleSetKind ruleSetKind);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java
index 7f2d3c8..3982171 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java
@@ -32,10 +32,18 @@
return counter;
}
+ public int getAndInc() {
+ return counter++;
+ }
+
public void inc() {
++counter;
}
+ public int incAndGet() {
+ return ++counter;
+ }
+
public void set(int newStart) {
counter = newStart;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 166ab9a..69ec210 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -93,4 +94,10 @@
public PlanStructureVerifier getPlanStructureVerifier();
public PlanStabilityVerifier getPlanStabilityVerifier();
+
+ void setCompilerFactory(Object factory);
+
+ Object getCompilerFactory();
+
+ IOptimizationContextFactory getOptimizationContextFactory();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 77fdd74..d350789 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -31,7 +31,9 @@
import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -56,11 +58,12 @@
IProjectionInfo<?> projectionInfo) throws AlgebricksException;
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException;
+ int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
+ RecordDescriptor inputDesc) throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc,
+ int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
+ IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
IResultMetadata metadata, JobSpecification spec) throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 138cff8..1f7c16c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -102,8 +102,9 @@
IPrinterFactory[] pf =
JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op), context, columns);
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
- resultOp.getDataSink(), columns, pf, inputDesc, resultOp.getResultMetadata(), spec);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
+ mp.getResultHandleRuntime(resultOp.getDataSink(), columns, pf, context.getWriterFactory(),
+ context.getResultSerializerFactoryProvider(), inputDesc, resultOp.getResultMetadata(), spec);
IOperatorDescriptor opDesc = runtimeAndConstraints.first;
opDesc.setSourceLocation(resultOp.getSourceLocation());
builder.contributeHyracksOperator(resultOp, opDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 3521a27..07c798f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -101,7 +101,7 @@
IMetadataProvider<?, ?> mp = context.getMetadataProvider();
Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints =
- mp.getWriteFileRuntime(write.getDataSink(), columns, pf, inputDesc);
+ mp.getWriteFileRuntime(write.getDataSink(), columns, pf, context.getWriterFactory(), inputDesc);
IPushRuntimeFactory runtime = runtimeAndConstraints.first;
runtime.setSourceLocation(write.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 7c7d5a8..471380c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
@@ -42,6 +43,7 @@
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
@@ -57,6 +59,8 @@
private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
private final IPrinterFactoryProvider printerFactoryProvider;
+ private final IAWriterFactory writerFactory;
+ private final IResultSerializerFactoryProvider resultSerializerFactoryProvider;
private final ITypeTraitProvider typeTraitProvider;
private final IMetadataProvider<?, ?> metadataProvider;
private final IMissingWriterFactory missingWriterFactory;
@@ -86,6 +90,7 @@
IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
IBinaryBooleanInspectorFactory booleanInspectorFactory,
IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
+ IAWriterFactory writerFactory, IResultSerializerFactoryProvider resultSerializerFactoryProvider,
IMissingWriterFactory missingWriterFactory, IMissingWriterFactory nullWriterFactory,
IUnnestingPositionWriterFactory unnestingPositionWriterFactory,
INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
@@ -106,6 +111,8 @@
this.booleanInspectorFactory = booleanInspectorFactory;
this.integerInspectorFactory = integerInspectorFactory;
this.printerFactoryProvider = printerFactoryProvider;
+ this.writerFactory = writerFactory;
+ this.resultSerializerFactoryProvider = resultSerializerFactoryProvider;
this.clusterLocations = clusterLocations;
this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
this.missingWriterFactory = missingWriterFactory;
@@ -172,6 +179,14 @@
return printerFactoryProvider;
}
+ public IAWriterFactory getWriterFactory() {
+ return writerFactory;
+ }
+
+ public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
+ return resultSerializerFactoryProvider;
+ }
+
public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider() {
return predEvaluatorFactoryProvider;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index d1fd247..bee842c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -46,45 +46,29 @@
import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
/**
* The Algebricks default implementation for IOptimizationContext.
*/
-@SuppressWarnings({ "unchecked", "rawtypes" })
+@SuppressWarnings({ "rawtypes" })
public class AlgebricksOptimizationContext implements IOptimizationContext {
- private int varCounter;
+ private final IOptimizationContextFactory optContextFactory;
private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
private final PhysicalOptimizationConfig physicalOptimizationConfig;
- private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() {
- Map<LogicalVariable, Integer> varSizeMap = new HashMap<>();
-
- @Override
- public void setVariableEvalSize(LogicalVariable var, int size) {
- varSizeMap.put(var, size);
- }
-
- @Override
- public int getVariableEvalSize(LogicalVariable var) {
- return varSizeMap.get(var);
- }
- };
-
- private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>();
-
- private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>();
- private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>();
- private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>();
-
- private IMetadataProvider metadataProvider;
- private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>();
-
+ private final VariableEvalSizeEnvironmentImpl varEvalSizeEnv = new VariableEvalSizeEnvironmentImpl();
+ private final Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>();
+ private final Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>();
+ private final Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>();
+ private final Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>();
+ private final HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>();
protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<>();
protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<>();
-
protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<>();
+
private final IExpressionTypeComputer expressionTypeComputer;
private final IMissableTypeComputer nullableTypeComputer;
private final INodeDomain defaultNodeDomain;
@@ -94,12 +78,18 @@
private final PlanStructureVerifier planStructureVerifier;
private final PlanStabilityVerifier planStabilityVerifier;
- public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ private int varCounter;
+ private IMetadataProvider metadataProvider;
+ private Object compilerFactory;
+
+ public AlgebricksOptimizationContext(IOptimizationContextFactory optContextFactory, int varCounter,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer,
IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations, IPlanPrettyPrinter prettyPrinter,
IWarningCollector warningCollector) {
+ this.optContextFactory = optContextFactory;
this.varCounter = varCounter;
this.expressionEvalSizeComputer = expressionEvalSizeComputer;
this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
@@ -115,6 +105,35 @@
this.planStabilityVerifier = isSanityCheckEnabled ? new PlanStabilityVerifier(prettyPrinter) : null;
}
+ public AlgebricksOptimizationContext(AlgebricksOptimizationContext from) {
+ optContextFactory = from.optContextFactory;
+ varCounter = from.varCounter;
+ expressionEvalSizeComputer = from.expressionEvalSizeComputer;
+ mergeAggregationExpressionFactory = from.mergeAggregationExpressionFactory;
+ expressionTypeComputer = from.expressionTypeComputer;
+ nullableTypeComputer = from.nullableTypeComputer;
+ physicalOptimizationConfig = from.physicalOptimizationConfig;
+ defaultNodeDomain = from.defaultNodeDomain;
+ prettyPrinter = from.prettyPrinter;
+ conflictingTypeResovler = from.conflictingTypeResovler;
+ warningCollector = NoOpWarningCollector.INSTANCE;
+ boolean isSanityCheckEnabled = physicalOptimizationConfig.isSanityCheckEnabled();
+ planStructureVerifier = isSanityCheckEnabled ? new PlanStructureVerifier(from.prettyPrinter, this) : null;
+ planStabilityVerifier = isSanityCheckEnabled ? new PlanStabilityVerifier(from.prettyPrinter) : null;
+ metadataProvider = from.metadataProvider;
+ compilerFactory = from.compilerFactory;
+
+ varEvalSizeEnv.varSizeMap.putAll(from.varEvalSizeEnv.varSizeMap);
+ typeEnvMap.putAll(from.typeEnvMap);
+ alreadyCompared.putAll(from.alreadyCompared);
+ dontApply.putAll(from.dontApply);
+ varToPrimaryKey.putAll(from.varToPrimaryKey);
+ notToBeInlinedVars.addAll(from.notToBeInlinedVars);
+ fdGlobalMap.putAll(from.fdGlobalMap);
+ eqClassGlobalMap.putAll(from.eqClassGlobalMap);
+ logicalProps.putAll(from.logicalProps);
+ }
+
@Override
public int getVarCounter() {
return varCounter;
@@ -354,4 +373,34 @@
public PlanStabilityVerifier getPlanStabilityVerifier() {
return planStabilityVerifier;
}
+
+ @Override
+ public IOptimizationContextFactory getOptimizationContextFactory() {
+ return optContextFactory;
+ }
+
+ @Override
+ public void setCompilerFactory(Object compilerFactory) {
+ this.compilerFactory = compilerFactory;
+ }
+
+ @Override
+ public Object getCompilerFactory() {
+ return compilerFactory;
+ }
+
+ protected static class VariableEvalSizeEnvironmentImpl implements IVariableEvalSizeEnvironment {
+
+ protected final Map<LogicalVariable, Integer> varSizeMap = new HashMap<>();
+
+ @Override
+ public void setVariableEvalSize(LogicalVariable var, int size) {
+ varSizeMap.put(var, size);
+ }
+
+ @Override
+ public int getVariableEvalSize(LogicalVariable var) {
+ return varSizeMap.get(var);
+ }
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 1c41e9a..c81025d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -28,10 +28,12 @@
import org.apache.hyracks.api.exceptions.IWarningCollector;
public interface IOptimizationContextFactory {
- public IOptimizationContext createOptimizationContext(int varCounter,
+ IOptimizationContext createOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
IConflictingTypeResolver conflictintTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector);
+
+ IOptimizationContext cloneOptimizationContext(IOptimizationContext oc);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java
new file mode 100644
index 0000000..5ce61ec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.rewriter.base;
+
+public interface IRuleSetKind {
+ String name();
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
index 7525180..75fd791 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
@@ -32,10 +32,8 @@
* - A printer factory array to print the tuple containing different fields.
* @param writerFactory
* - A writer factory to write the serialized data to the print stream.
- * @param inputRecordDesc
- * - The record descriptor describing the input frame to be serialized.
* @return A new instance of result serialized appender.
*/
- public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
- IPrinterFactory[] printerFactories, IAWriterFactory writerFactory);
+ public IResultSerializerFactory getResultSerializerFactoryProvider(int[] fields, IPrinterFactory[] printerFactories,
+ IAWriterFactory writerFactory);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java
index e1b2e5d..1eda133 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java
@@ -47,7 +47,7 @@
}
DistributeResultOperator dop = (DistributeResultOperator) op;
IResultMetadata resultMetadata = dop.getResultMetadata();
- if (resultMetadata.getOutputTypes() != null) {
+ if (resultMetadata == null || resultMetadata.getOutputTypes() != null) {
return false;
}
List<Mutable<ILogicalExpression>> exprList = dop.getExpressions();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
index 763e6ff..90fa824 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
@@ -40,7 +40,7 @@
}
@Override
- public IResultSerializerFactory getAqlResultSerializerFactoryProvider(final int[] fields,
+ public IResultSerializerFactory getResultSerializerFactoryProvider(final int[] fields,
final IPrinterFactory[] printerFactories, final IAWriterFactory writerFactory) {
return new IResultSerializerFactory() {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
index bc7634d..8ce4f2b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
@@ -32,7 +32,15 @@
public class SerializedDataWriterFactory implements IAWriterFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ public static final SerializedDataWriterFactory WITHOUT_RECORD_DESCRIPTOR = new SerializedDataWriterFactory(false);
+
+ private final boolean writeRecordDescriptor;
+
+ public SerializedDataWriterFactory(boolean writeRecordDescriptor) {
+ this.writeRecordDescriptor = writeRecordDescriptor;
+ }
@Override
public IAWriter createWriter(final int[] fields, final PrintStream ps, IPrinterFactory[] printerFactories,
@@ -41,15 +49,17 @@
@Override
public void init() throws HyracksDataException {
- // dump the SerializerDeserializers to disk
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(inputRecordDescriptor);
- baos.writeTo(ps);
- oos.close();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ if (writeRecordDescriptor) {
+ // dump the SerializerDeserializers to disk
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(inputRecordDescriptor);
+ baos.writeTo(ps);
+ oos.close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java
new file mode 100644
index 0000000..caf2464
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.exceptions;
+
+public final class NoOpWarningCollector implements IWarningCollector {
+
+ public static final IWarningCollector INSTANCE = new NoOpWarningCollector();
+
+ private NoOpWarningCollector() {
+ }
+
+ @Override
+ public void warn(Warning warning) {
+ // no-op
+ }
+
+ @Override
+ public boolean shouldWarn() {
+ return false;
+ }
+
+ @Override
+ public long getTotalWarningsCount() {
+ return 0;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 3f78234..71c81a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -38,7 +38,7 @@
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -52,22 +52,7 @@
public class TestUtils {
private static final int DEFAULT_FRAME_SIZE = 32768;
- public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() {
- @Override
- public void warn(Warning warning) {
- // no-op
- }
-
- @Override
- public boolean shouldWarn() {
- return false;
- }
-
- @Override
- public long getTotalWarningsCount() {
- return 0;
- }
- };
+ public static final IWarningCollector NOOP_WARNING_COLLECTOR = NoOpWarningCollector.INSTANCE;
public static IHyracksTaskContext createHyracksTask() {
return create(DEFAULT_FRAME_SIZE);