Merged fullstack_lsm_staging upto r3336
git-svn-id: https://hyracks.googlecode.com/svn/trunk@3339 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index edc1b66..2b24bd0 100644
--- a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuilder {
@@ -81,7 +82,8 @@
}
@Override
- public JobSpecification createJob(Object appContext) throws AlgebricksException {
+ public JobSpecification createJob(Object appContext,
+ IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
JobGenContext context = new JobGenContext(null, metadata, appContext,
serializerDeserializerProvider, hashFunctionFactoryProvider,
@@ -91,7 +93,7 @@
expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
partialAggregationTypeComputer, frameSize, clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
- return pc.compilePlan(plan, null);
+ return pc.compilePlan(plan, null, jobEventListenerFactory);
}
};
}
diff --git a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java
index 9146722..517ca6b 100644
--- a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java
+++ b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java
@@ -15,10 +15,12 @@
package edu.uci.ics.hyracks.algebricks.compiler.api;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public interface ICompiler {
public void optimize() throws AlgebricksException;
- public JobSpecification createJob(Object appContext) throws AlgebricksException;
+ public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
+ throws AlgebricksException;
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 32cfb9a..0efb5ff 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -43,7 +43,9 @@
WRITE_RESULT,
INSERT_DELETE,
INDEX_INSERT_DELETE,
- UPDATE,
+ UPDATE,
INVERTED_INDEX_SEARCH,
- PARTITIONINGSPLIT
+ FUZZY_INVERTED_INDEX_SEARCH,
+ PARTITIONINGSPLIT,
+ EXTENSION_OPERATOR
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 82187e3..73da206 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -42,8 +42,8 @@
*/
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException;
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context,
+ JobSpecification jobSpec, Object implConfig) throws AlgebricksException;
public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
@@ -60,12 +60,14 @@
JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+ IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+ IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException;
/**
* Creates the insert runtime of IndexInsertDeletePOperator, which models
@@ -140,4 +142,5 @@
public IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
public IFunctionInfo lookupFunction(FunctionIdentifier fid);
+
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index c5f4c71..545d039 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -104,7 +104,7 @@
if (p.getRoots().size() > 1) {
throw new NotImplementedException("Nested plans with several roots are not supported.");
}
- JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema);
+ JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
JobGenContext context = pc.getContext();
IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index a2dac3f..526c37f 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -37,11 +37,20 @@
public class DataSourceScanPOperator extends AbstractScanPOperator {
private IDataSource<?> dataSource;
+ private Object implConfig;
public DataSourceScanPOperator(IDataSource<?> dataSource) {
this.dataSource = dataSource;
}
+ public void setImplConfig(Object implConfig) {
+ this.implConfig = implConfig;
+ }
+
+ public Object getImplConfig() {
+ return implConfig;
+ }
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.DATASOURCE_SCAN;
@@ -71,7 +80,7 @@
List<LogicalVariable> vars = scan.getVariables();
List<LogicalVariable> projectVars = scan.getProjectVariables();
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = mp.getScannerRuntime(dataSource, vars,
- projectVars, scan.isProjectPushed(), opSchema, typeEnv, context, builder.getJobSpec());
+ projectVars, scan.isProjectPushed(), opSchema, typeEnv, context, builder.getJobSpec(), implConfig);
builder.contributeHyracksOperator(scan, p.first);
if (p.second != null) {
builder.contributeAlgebricksPartitionConstraint(p.first, p.second);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 563fcc5..b09c194 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -87,7 +86,8 @@
Object t = env.getVarType(v);
comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
}
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc = null;
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index 477d257..3e2827b 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -11,6 +11,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -70,17 +71,19 @@
throws AlgebricksException {
InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
IMetadataProvider mp = context.getMetadataProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
JobSpecification spec = builder.getJobSpec();
- RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0],
- context);
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
- if (insertDeleteOp.getOperation() == Kind.INSERT)
- runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, keys, payload, inputDesc,
- context, spec);
- else
- runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, keys, payload, inputDesc,
- context, spec);
+ if (insertDeleteOp.getOperation() == Kind.INSERT) {
+ runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ inputDesc, context, spec);
+ } else {
+ runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ inputDesc, context, spec);
+ }
builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index d153f90..5e8b59e 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -57,8 +57,13 @@
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
/**
+ * <<<<<<< .working
* Left input is broadcast and preserves its local properties. Right input can
* be partitioned in any way.
+ * =======
+ * Left input is broadcast and preserves its local properties.
+ * Right input can be partitioned in any way.
+ * >>>>>>> .merge-right.r3014
*/
public class NLJoinPOperator extends AbstractJoinPOperator {
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index 52f8e0b..a4642bb 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -61,7 +61,8 @@
StringStreamingScriptDescription sssd = (StringStreamingScriptDescription) scriptDesc;
StringStreamingRuntimeFactory runtime = new StringStreamingRuntimeFactory(sssd.getCommand(),
sssd.getPrinterFactories(), sssd.getFieldDelimiter(), sssd.getParserFactory());
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
builder.contributeMicroOperator(scriptOp, runtime, recDesc);
// and contribute one edge from its child
ILogicalOperator src = scriptOp.getInputs().get(0).getValue();
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java
index 11ec043..061dbe4 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -20,20 +20,4 @@
public static final boolean DEBUG = true;
public static final String ALGEBRICKS_LOGGER_NAME = "edu.uci.ics.hyracks.algebricks";
public static final Logger ALGEBRICKS_LOGGER = Logger.getLogger(ALGEBRICKS_LOGGER_NAME);
- public static final String HYRACKS_APP_NAME = "algebricks";
-
- // public static final Level ALGEBRICKS_LOG_LEVEL = Level.FINEST;
- //
- // static {
- // Handler h;
- // try {
- // h = new ConsoleHandler();
- // h.setFormatter(new SysoutFormatter());
- // } catch (Exception e) {
- // h = new ConsoleHandler();
- // }
- // h.setLevel(ALGEBRICKS_LOG_LEVEL);
- // ALGEBRICKS_LOGGER.addHandler(h);
- // ALGEBRICKS_LOGGER.setLevel(ALGEBRICKS_LOG_LEVEL);
- // }
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 63a6852..b77d65d 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -42,8 +43,11 @@
return context;
}
- public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema) throws AlgebricksException {
+ public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
JobSpecification spec = new JobSpecification();
+ if (jobEventListenerFactory != null) {
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+ }
List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
IHyracksJobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml b/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
index d4d6c89..d3314b5 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -49,6 +49,41 @@
</configuration>
</plugin>
</plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.codehaus.mojo
+ </groupId>
+ <artifactId>
+ javacc-maven-plugin
+ </artifactId>
+ <versionRange>
+ [2.6,)
+ </versionRange>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
<dependencies>
<dependency>
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
index 2981157..6c2700d 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
@@ -163,7 +163,7 @@
LOGGER.info("Optimized Plan:");
LOGGER.info(getPrettyPrintedPlan(plan));
}
- return compiler.createJob(null);
+ return compiler.createJob(null, null);
}
private ILogicalPlan translate(List<ASTNode> ast) throws PigletException {
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 15b290e..9caa3e3 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -62,13 +62,10 @@
@SuppressWarnings("unchecked")
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
- IDataSource<String> dataSource,
- List<LogicalVariable> scanVariables,
- List<LogicalVariable> projectVariables, boolean projectPushed,
- IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
- JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException {
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<String> dataSource,
+ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context,
+ JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
PigletFileDataSource ds = (PigletFileDataSource) dataSource;
FileSplit[] fileSplits = ds.getFileSplits();
@@ -160,22 +157,6 @@
}
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<String> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
@@ -194,9 +175,28 @@
// TODO Auto-generated method stub
return null;
}
-
+
@Override
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return FN_MAP.get(fid);
}
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
+ IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<String> dataSource,
+ IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
new file mode 100644
index 0000000..3260ca0
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * If there is any ordering property before the subplan operator, the ordering should
+ * be kept after the subplan.
+ * This rule adds a redundant order operator after those cases, to guarantee the correctness.
+ *
+ * @author yingyib
+ */
+public class EnforceOrderByAfterSubplan implements IAlgebraicRewriteRule {
+ /** a set of order-breaking operators */
+ private final Set<LogicalOperatorTag> orderBreakingOps = new HashSet<LogicalOperatorTag>();
+
+ public EnforceOrderByAfterSubplan() {
+ /** add operators that break the ordering */
+ orderBreakingOps.add(LogicalOperatorTag.INNERJOIN);
+ orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
+ orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+ if (context.checkIfInDontApplySet(this, op1)) {
+ return false;
+ }
+ List<Mutable<ILogicalOperator>> inputs = op1.getInputs();
+ context.addToDontApplySet(this, op1);
+ if (op1.getOperatorTag() == LogicalOperatorTag.ORDER || inputs == null) {
+ /**
+ * does not apply if
+ * 1. there is yet-another order operator on-top-of the subplan, because the downstream order operator's ordering will be broken anyway
+ * 2. the input operator(s) is null
+ */
+ return false;
+ }
+ boolean changed = false;
+ for (int i = 0; i < inputs.size(); i++) {
+ Mutable<ILogicalOperator> inputOpRef = inputs.get(i);
+ AbstractLogicalOperator op = (AbstractLogicalOperator) inputOpRef.getValue();
+ context.addToDontApplySet(this, op);
+ if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+ continue;
+ }
+
+ /**
+ * check the order operators whose ordering is not broken before the subplan operator, and then
+ * duplicate them on-top-of the subplan operator
+ */
+ boolean foundTarget = true;
+ AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ while (child.getOperatorTag() != LogicalOperatorTag.ORDER) {
+ context.addToDontApplySet(this, child);
+ if (orderBreakingOps.contains(child.getOperatorTag())) {
+ foundTarget = false;
+ break;
+ }
+ List<Mutable<ILogicalOperator>> childInputs = child.getInputs();
+ if (childInputs == null || childInputs.size() > 2 || childInputs.size() < 1) {
+ foundTarget = false;
+ break;
+ } else {
+ child = (AbstractLogicalOperator) childInputs.get(0).getValue();
+ }
+ }
+ /** the target order-by operator has not been found. */
+ if (!foundTarget) {
+ return false;
+ }
+
+ /** duplicate the order-by operator and insert on-top-of the subplan operator */
+ context.addToDontApplySet(this, child);
+ OrderOperator sourceOrderOp = (OrderOperator) child;
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp
+ .getOrderExpressions());
+ OrderOperator newOrderOp = new OrderOperator(orderExprs);
+ context.addToDontApplySet(this, newOrderOp);
+ inputs.set(i, new MutableObject<ILogicalOperator>(newOrderOp));
+ newOrderOp.getInputs().add(inputOpRef);
+ context.computeAndSetTypeEnvironmentForOperator(newOrderOp);
+ changed = true;
+ }
+ return changed;
+ }
+
+ private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExpr) {
+ return new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
+ }
+
+ private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+ for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs)
+ newOrdersAndExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first,
+ deepCopyExpressionRef(pair.second)));
+ return newOrdersAndExprs;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 7e27b79..0d20f28 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -54,7 +54,8 @@
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT) {
+ if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
+ && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
if (!roots.contains(op))
@@ -66,7 +67,8 @@
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT) {
+ if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
+ && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
boolean rewritten = false;
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
index 796ef0a..508ce5d 100644
--- a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
@@ -72,6 +72,8 @@
}
} else {
throw new HyracksDataException("injected failure");
+// System.out.println("Injected Kill-JVM");
+// System.exit(-1);
}
}
}
diff --git a/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index dad7cd0..2855eb2 100644
--- a/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -71,7 +71,6 @@
nc2.start();
hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
- hcc.createApplication(AlgebricksConfig.HYRACKS_APP_NAME, null);
}
public static void deinit() throws Exception {
@@ -82,7 +81,7 @@
public static void runJob(JobSpecification spec) throws Exception {
AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
- JobId jobId = hcc.startJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
hcc.waitForCompletion(jobId);
}