Merged fullstack_lsm_staging upto r3336

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3339 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index edc1b66..2b24bd0 100644
--- a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuilder {
@@ -81,7 +82,8 @@
                     }
 
                     @Override
-                    public JobSpecification createJob(Object appContext) throws AlgebricksException {
+                    public JobSpecification createJob(Object appContext,
+                            IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
                         AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
                         JobGenContext context = new JobGenContext(null, metadata, appContext,
                                 serializerDeserializerProvider, hashFunctionFactoryProvider,
@@ -91,7 +93,7 @@
                                 expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
                                 partialAggregationTypeComputer, frameSize, clusterLocations);
                         PlanCompiler pc = new PlanCompiler(context);
-                        return pc.compilePlan(plan, null);
+                        return pc.compilePlan(plan, null, jobEventListenerFactory);
                     }
                 };
             }
diff --git a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java
index 9146722..517ca6b 100644
--- a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java
+++ b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/ICompiler.java
@@ -15,10 +15,12 @@
 package edu.uci.ics.hyracks.algebricks.compiler.api;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public interface ICompiler {
     public void optimize() throws AlgebricksException;
 
-    public JobSpecification createJob(Object appContext) throws AlgebricksException;
+    public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
+            throws AlgebricksException;
 }
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 32cfb9a..0efb5ff 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -43,7 +43,9 @@
     WRITE_RESULT,
     INSERT_DELETE,
     INDEX_INSERT_DELETE,
-	UPDATE,
+    UPDATE,
     INVERTED_INDEX_SEARCH,
-    PARTITIONINGSPLIT
+    FUZZY_INVERTED_INDEX_SEARCH,
+    PARTITIONINGSPLIT,
+    EXTENSION_OPERATOR
 }
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 82187e3..73da206 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -42,8 +42,8 @@
      */
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
-            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
-            throws AlgebricksException;
+            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context,
+            JobSpecification jobSpec, Object implConfig) throws AlgebricksException;
 
     public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
 
@@ -60,12 +60,14 @@
             JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
-            IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+            throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
-            IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+            throws AlgebricksException;
 
     /**
      * Creates the insert runtime of IndexInsertDeletePOperator, which models
@@ -140,4 +142,5 @@
     public IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
 
     public IFunctionInfo lookupFunction(FunctionIdentifier fid);
+
 }
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index c5f4c71..545d039 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -104,7 +104,7 @@
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots are not supported.");
         }
-        JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema);
+        JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
         ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
         JobGenContext context = pc.getContext();
         IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index a2dac3f..526c37f 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -37,11 +37,20 @@
 public class DataSourceScanPOperator extends AbstractScanPOperator {
 
     private IDataSource<?> dataSource;
+    private Object implConfig;
 
     public DataSourceScanPOperator(IDataSource<?> dataSource) {
         this.dataSource = dataSource;
     }
 
+    public void setImplConfig(Object implConfig) {
+        this.implConfig = implConfig;
+    }
+
+    public Object getImplConfig() {
+        return implConfig;
+    }
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.DATASOURCE_SCAN;
@@ -71,7 +80,7 @@
         List<LogicalVariable> vars = scan.getVariables();
         List<LogicalVariable> projectVars = scan.getProjectVariables();
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = mp.getScannerRuntime(dataSource, vars,
-                projectVars, scan.isProjectPushed(), opSchema, typeEnv, context, builder.getJobSpec());
+                projectVars, scan.isProjectPushed(), opSchema, typeEnv, context, builder.getJobSpec(), implConfig);
         builder.contributeHyracksOperator(scan, p.first);
         if (p.second != null) {
             builder.contributeAlgebricksPartitionConstraint(p.first, p.second);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 563fcc5..b09c194 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -28,7 +28,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -87,7 +86,8 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
-        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc = null;
 
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index 477d257..3e2827b 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -70,17 +71,19 @@
             throws AlgebricksException {
         InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
         IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
         JobSpecification spec = builder.getJobSpec();
-        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0],
-                context);
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
-        if (insertDeleteOp.getOperation() == Kind.INSERT)
-            runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, keys, payload, inputDesc,
-                    context, spec);
-        else
-            runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, keys, payload, inputDesc,
-                    context, spec);
+        if (insertDeleteOp.getOperation() == Kind.INSERT) {
+            runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    inputDesc, context, spec);
+        } else {
+            runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    inputDesc, context, spec);
+        }
 
         builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
         builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index d153f90..5e8b59e 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -57,8 +57,13 @@
 import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
 
 /**
+ * <<<<<<< .working
  * Left input is broadcast and preserves its local properties. Right input can
  * be partitioned in any way.
+ * =======
+ * Left input is broadcast and preserves its local properties.
+ * Right input can be partitioned in any way.
+ * >>>>>>> .merge-right.r3014
  */
 public class NLJoinPOperator extends AbstractJoinPOperator {
 
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index 52f8e0b..a4642bb 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -61,7 +61,8 @@
         StringStreamingScriptDescription sssd = (StringStreamingScriptDescription) scriptDesc;
         StringStreamingRuntimeFactory runtime = new StringStreamingRuntimeFactory(sssd.getCommand(),
                 sssd.getPrinterFactories(), sssd.getFieldDelimiter(), sssd.getParserFactory());
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
         builder.contributeMicroOperator(scriptOp, runtime, recDesc);
         // and contribute one edge from its child
         ILogicalOperator src = scriptOp.getInputs().get(0).getValue();
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java
index 11ec043..061dbe4 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -20,20 +20,4 @@
     public static final boolean DEBUG = true;
     public static final String ALGEBRICKS_LOGGER_NAME = "edu.uci.ics.hyracks.algebricks";
     public static final Logger ALGEBRICKS_LOGGER = Logger.getLogger(ALGEBRICKS_LOGGER_NAME);
-    public static final String HYRACKS_APP_NAME = "algebricks";
-
-    // public static final Level ALGEBRICKS_LOG_LEVEL = Level.FINEST;
-    //
-    // static {
-    // Handler h;
-    // try {
-    // h = new ConsoleHandler();
-    // h.setFormatter(new SysoutFormatter());
-    // } catch (Exception e) {
-    // h = new ConsoleHandler();
-    // }
-    // h.setLevel(ALGEBRICKS_LOG_LEVEL);
-    // ALGEBRICKS_LOGGER.addHandler(h);
-    // ALGEBRICKS_LOGGER.setLevel(ALGEBRICKS_LOG_LEVEL);
-    // }
 }
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 63a6852..b77d65d 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
@@ -42,8 +43,11 @@
         return context;
     }
 
-    public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema) throws AlgebricksException {
+    public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
         JobSpecification spec = new JobSpecification();
+        if (jobEventListenerFactory != null) {
+            spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        }
         List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
         IHyracksJobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml b/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
index d4d6c89..d3314b5 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -49,6 +49,41 @@
 				</configuration>
 			</plugin>
 		</plugins>
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.codehaus.mojo
+										</groupId>
+										<artifactId>
+											javacc-maven-plugin
+										</artifactId>
+										<versionRange>
+											[2.6,)
+										</versionRange>
+										<goals>
+											<goal>javacc</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
 	</build>
 	<dependencies>
 		<dependency>
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
index 2981157..6c2700d 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
@@ -163,7 +163,7 @@
             LOGGER.info("Optimized Plan:");
             LOGGER.info(getPrettyPrintedPlan(plan));
         }
-        return compiler.createJob(null);
+        return compiler.createJob(null, null);
     }
 
     private ILogicalPlan translate(List<ASTNode> ast) throws PigletException {
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 15b290e..9caa3e3 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -62,13 +62,10 @@
 
     @SuppressWarnings("unchecked")
     @Override
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
-			IDataSource<String> dataSource,
-			List<LogicalVariable> scanVariables,
-			List<LogicalVariable> projectVariables, boolean projectPushed,
-			IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-			JobGenContext context, JobSpecification jobSpec)
-			throws AlgebricksException {
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<String> dataSource,
+            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context,
+            JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
         PigletFileDataSource ds = (PigletFileDataSource) dataSource;
 
         FileSplit[] fileSplits = ds.getFileSplits();
@@ -160,22 +157,6 @@
     }
 
     @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
-            IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<String> dataSource,
-            IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
             IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
@@ -194,9 +175,28 @@
         // TODO Auto-generated method stub
         return null;
     }
-    
+
     @Override
     public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
         return FN_MAP.get(fid);
     }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+            throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<String> dataSource,
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec)
+            throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
new file mode 100644
index 0000000..3260ca0
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * If there is any ordering property before the subplan operator, the ordering should
+ * be kept after the subplan.
+ * This rule adds a redundant order operator after those cases, to guarantee the correctness.
+ * 
+ * @author yingyib
+ */
+public class EnforceOrderByAfterSubplan implements IAlgebraicRewriteRule {
+    /** a set of order-breaking operators */
+    private final Set<LogicalOperatorTag> orderBreakingOps = new HashSet<LogicalOperatorTag>();
+
+    public EnforceOrderByAfterSubplan() {
+        /** add operators that break the ordering */
+        orderBreakingOps.add(LogicalOperatorTag.INNERJOIN);
+        orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
+        orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op1)) {
+            return false;
+        }
+        List<Mutable<ILogicalOperator>> inputs = op1.getInputs();
+        context.addToDontApplySet(this, op1);
+        if (op1.getOperatorTag() == LogicalOperatorTag.ORDER || inputs == null) {
+            /**
+             * does not apply if
+             * 1. there is yet-another order operator on-top-of the subplan, because the downstream order operator's ordering will be broken anyway
+             * 2. the input operator(s) is null
+             */
+            return false;
+        }
+        boolean changed = false;
+        for (int i = 0; i < inputs.size(); i++) {
+            Mutable<ILogicalOperator> inputOpRef = inputs.get(i);
+            AbstractLogicalOperator op = (AbstractLogicalOperator) inputOpRef.getValue();
+            context.addToDontApplySet(this, op);
+            if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                continue;
+            }
+
+            /**
+             * check the order operators whose ordering is not broken before the subplan operator, and then
+             * duplicate them on-top-of the subplan operator
+             */
+            boolean foundTarget = true;
+            AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+            while (child.getOperatorTag() != LogicalOperatorTag.ORDER) {
+                context.addToDontApplySet(this, child);
+                if (orderBreakingOps.contains(child.getOperatorTag())) {
+                    foundTarget = false;
+                    break;
+                }
+                List<Mutable<ILogicalOperator>> childInputs = child.getInputs();
+                if (childInputs == null || childInputs.size() > 2 || childInputs.size() < 1) {
+                    foundTarget = false;
+                    break;
+                } else {
+                    child = (AbstractLogicalOperator) childInputs.get(0).getValue();
+                }
+            }
+            /** the target order-by operator has not been found. */
+            if (!foundTarget) {
+                return false;
+            }
+
+            /** duplicate the order-by operator and insert on-top-of the subplan operator */
+            context.addToDontApplySet(this, child);
+            OrderOperator sourceOrderOp = (OrderOperator) child;
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp
+                    .getOrderExpressions());
+            OrderOperator newOrderOp = new OrderOperator(orderExprs);
+            context.addToDontApplySet(this, newOrderOp);
+            inputs.set(i, new MutableObject<ILogicalOperator>(newOrderOp));
+            newOrderOp.getInputs().add(inputOpRef);
+            context.computeAndSetTypeEnvironmentForOperator(newOrderOp);
+            changed = true;
+        }
+        return changed;
+    }
+
+    private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExpr) {
+        return new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
+    }
+
+    private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+        for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs)
+            newOrdersAndExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first,
+                    deepCopyExpressionRef(pair.second)));
+        return newOrdersAndExprs;
+    }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 7e27b79..0d20f28 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -54,7 +54,8 @@
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT) {
+        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
+                && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
         if (!roots.contains(op))
@@ -66,7 +67,8 @@
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT) {
+        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
+                && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
         boolean rewritten = false;
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
index 796ef0a..508ce5d 100644
--- a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
@@ -72,6 +72,8 @@
                         }
                     } else {
                         throw new HyracksDataException("injected failure");
+//                    	System.out.println("Injected Kill-JVM");
+//                    	System.exit(-1);
                     }
                 }
             }
diff --git a/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index dad7cd0..2855eb2 100644
--- a/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -71,7 +71,6 @@
         nc2.start();
 
         hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
-        hcc.createApplication(AlgebricksConfig.HYRACKS_APP_NAME, null);
     }
 
     public static void deinit() throws Exception {
@@ -82,7 +81,7 @@
 
     public static void runJob(JobSpecification spec) throws Exception {
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
-        JobId jobId = hcc.startJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
         hcc.waitForCompletion(jobId);
     }