reverse integrate super-activity-rewriting into fullstack_asterix_stabilization

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3035 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-common/pom.xml b/algebricks/algebricks-common/pom.xml
index a5677a5..ad2e8f7 100644
--- a/algebricks/algebricks-common/pom.xml
+++ b/algebricks/algebricks-common/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-compiler/pom.xml b/algebricks/algebricks-compiler/pom.xml
index 8dc083d..9d2a115 100644
--- a/algebricks/algebricks-compiler/pom.xml
+++ b/algebricks/algebricks-compiler/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-core/pom.xml b/algebricks/algebricks-core/pom.xml
index a74a540..1166842 100644
--- a/algebricks/algebricks-core/pom.xml
+++ b/algebricks/algebricks-core/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-data/pom.xml b/algebricks/algebricks-data/pom.xml
index 3d927d9..04fc66f 100644
--- a/algebricks/algebricks-data/pom.xml
+++ b/algebricks/algebricks-data/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index 954938a..0cf1bab 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -18,6 +18,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/algebricks/algebricks-rewriter/pom.xml b/algebricks/algebricks-rewriter/pom.xml
index 41979d3..9ba1603 100644
--- a/algebricks/algebricks-rewriter/pom.xml
+++ b/algebricks/algebricks-rewriter/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-runtime/pom.xml b/algebricks/algebricks-runtime/pom.xml
index e40dfb0..458b5ee 100644
--- a/algebricks/algebricks-runtime/pom.xml
+++ b/algebricks/algebricks-runtime/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-tests/pom.xml b/algebricks/algebricks-tests/pom.xml
index 19e6711..a62398c 100644
--- a/algebricks/algebricks-tests/pom.xml
+++ b/algebricks/algebricks-tests/pom.xml
@@ -18,7 +18,9 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
-        </configuration>
+          <fork>true</fork>
+	  <encoding>UTF-8</encoding>
+	</configuration>
       </plugin>
       <plugin>
         <artifactId>maven-antrun-plugin</artifactId>
diff --git a/hivesterix/pom.xml b/hivesterix/pom.xml
index cf9c4b5..1553a2b 100644
--- a/hivesterix/pom.xml
+++ b/hivesterix/pom.xml
@@ -4,7 +4,7 @@
 	<groupId>edu.uci.ics.hivesterix</groupId>

 	<artifactId>hivesterix</artifactId>

 	<version>0.2.3-SNAPSHOT</version>

- 	<name>hivesterix</name>

+	<name>hivesterix</name>

 	<dependencies>

 		<dependency>

 			<groupId>javax.servlet</groupId>

@@ -308,11 +308,32 @@
 				<artifactId>maven-compiler-plugin</artifactId>

 				<version>2.0.2</version>

 				<configuration>

-					<source>1.6</source>

-					<target>1.6</target>

+					<source>1.7</source>

+					<target>1.7</target>

+					<encoding>UTF-8</encoding>

+					<fork>true</fork>

 				</configuration>

 			</plugin>

 			<plugin>

+				<artifactId>maven-jar-plugin</artifactId>

+				<executions>

+					<execution>

+						<id>patch</id>

+						<goals>

+							<goal>jar</goal>

+						</goals>

+						<phase>package</phase>

+						<configuration>

+							<classifier>patch</classifier>

+							<finalName>a-hive</finalName>

+							<includes>

+								<include>**/org/apache/**</include>

+							</includes>

+						</configuration>

+					</execution>

+				</executions>

+			</plugin>

+			<plugin>

 				<groupId>org.codehaus.mojo</groupId>

 				<artifactId>appassembler-maven-plugin</artifactId>

 				<version>1.3</version>

@@ -358,8 +379,8 @@
 				<version>2.13</version>

 				<configuration>

 					<forkMode>pertest</forkMode>

-					<argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8 

-                        -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>

+					<argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8

+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>

 					<includes>

 						<include>**/test/optimizer/*TestSuite.java</include>

 						<include>**/test/optimizer/*Test.java</include>

diff --git a/hivesterix/src/main/assembly/binary-assembly.xml b/hivesterix/src/main/assembly/binary-assembly.xml
index 0500499..de3757f 100755
--- a/hivesterix/src/main/assembly/binary-assembly.xml
+++ b/hivesterix/src/main/assembly/binary-assembly.xml
@@ -1,19 +1,26 @@
 <assembly>
-  <id>binary-assembly</id>
-  <formats>
-    <format>zip</format>
-    <format>dir</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>target/appassembler/bin</directory>
-      <outputDirectory>bin</outputDirectory>
-      <fileMode>0755</fileMode>
-    </fileSet>
-    <fileSet>
-      <directory>target/appassembler/lib</directory>
-      <outputDirectory>lib</outputDirectory>
-    </fileSet>
-  </fileSets>
+	<id>binary-assembly</id>
+	<formats>
+		<format>zip</format>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>target/appassembler/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>target/appassembler/lib</directory>
+			<outputDirectory>lib</outputDirectory>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>lib</outputDirectory>
+			<includes>
+				<include>*.jar</include>
+			</includes>
+		</fileSet>
+	</fileSets>
 </assembly>
diff --git a/hivesterix/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java b/hivesterix/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
index 1fb973e..2765e44 100644
--- a/hivesterix/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
+++ b/hivesterix/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
@@ -80,729 +80,770 @@
 @SuppressWarnings("rawtypes")

 public class HiveAlgebricksTranslator implements Translator {

 

-    private int currentVariable = 0;

+	private int currentVariable = 0;

 

-    private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();

+	private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();

 

-    private boolean continueTraverse = true;

+	private boolean continueTraverse = true;

 

-    private IMetadataProvider<PartitionDesc, Object> metaData;

+	private IMetadataProvider<PartitionDesc, Object> metaData;

 

-    /**

-     * map variable name to the logical variable

-     */

-    private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();

+	/**

+	 * map variable name to the logical variable

+	 */

+	private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();

 

-    /**

-     * map field name to LogicalVariable

-     */

-    private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();

+	/**

+	 * map field name to LogicalVariable

+	 */

+	private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();

 

-    /**

-     * map logical variable to name

-     */

-    private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();

+	/**

+	 * map logical variable to name

+	 */

+	private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();

 

-    /**

-     * asterix root operators

-     */

-    private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();

+	/**

+	 * asterix root operators

+	 */

+	private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();

 

-    /**

-     * a list of visitors

-     */

-    private List<Visitor> visitors = new ArrayList<Visitor>();

+	/**

+	 * a list of visitors

+	 */

+	private List<Visitor> visitors = new ArrayList<Visitor>();

 

-    /**

-     * output writer to print things out

-     */

-    private static PrintWriter outputWriter = new PrintWriter(new OutputStreamWriter(System.out));

+	/**

+	 * output writer to print things out

+	 */

+	private static PrintWriter outputWriter = new PrintWriter(

+			new OutputStreamWriter(System.out));

 

-    /**

-     * map a logical variable to type info

-     */

-    private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();

+	/**

+	 * map a logical variable to type info

+	 */

+	private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();

 

-    @Override

-    public LogicalVariable getVariable(String fieldName, TypeInfo type) {

-        LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

-        if (var == null) {

-            currentVariable++;

-            var = new LogicalVariable(currentVariable);

-            fieldToLogicalVariableMap.put(fieldName, var);

-            nameToLogicalVariableMap.put(var.toString(), var);

-            variableToType.put(var, type);

-            logicalVariableToFieldMap.put(var, fieldName);

-        }

-        return var;

-    }

+	@Override

+	public LogicalVariable getVariable(String fieldName, TypeInfo type) {

+		LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

+		if (var == null) {

+			currentVariable++;

+			var = new LogicalVariable(currentVariable);

+			fieldToLogicalVariableMap.put(fieldName, var);

+			nameToLogicalVariableMap.put(var.toString(), var);

+			variableToType.put(var, type);

+			logicalVariableToFieldMap.put(var, fieldName);

+		}

+		return var;

+	}

 

-    @Override

-    public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {

-        currentVariable++;

-        LogicalVariable var = new LogicalVariable(currentVariable);

-        fieldToLogicalVariableMap.put(fieldName, var);

-        nameToLogicalVariableMap.put(var.toString(), var);

-        variableToType.put(var, type);

-        logicalVariableToFieldMap.put(var, fieldName);

-        return var;

-    }

+	@Override

+	public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {

+		currentVariable++;

+		LogicalVariable var = new LogicalVariable(currentVariable);

+		fieldToLogicalVariableMap.put(fieldName, var);

+		nameToLogicalVariableMap.put(var.toString(), var);

+		variableToType.put(var, type);

+		logicalVariableToFieldMap.put(var, fieldName);

+		return var;

+	}

 

-    @Override

-    public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {

-        String name = this.logicalVariableToFieldMap.get(oldVar);

-        if (name != null) {

-            fieldToLogicalVariableMap.put(name, newVar);

-            nameToLogicalVariableMap.put(newVar.toString(), newVar);

-            nameToLogicalVariableMap.put(oldVar.toString(), newVar);

-            logicalVariableToFieldMap.put(newVar, name);

-        }

-    }

+	@Override

+	public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {

+		String name = this.logicalVariableToFieldMap.get(oldVar);

+		if (name != null) {

+			fieldToLogicalVariableMap.put(name, newVar);

+			nameToLogicalVariableMap.put(newVar.toString(), newVar);

+			nameToLogicalVariableMap.put(oldVar.toString(), newVar);

+			logicalVariableToFieldMap.put(newVar, name);

+		}

+	}

 

-    @Override

-    public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {

-        return metaData;

-    }

+	@Override

+	public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {

+		return metaData;

+	}

 

-    /**

-     * only get an variable, without rewriting it

-     * 

-     * @param fieldName

-     * @return

-     */

-    private LogicalVariable getVariableOnly(String fieldName) {

-        return fieldToLogicalVariableMap.get(fieldName);

-    }

+	/**

+	 * only get an variable, without rewriting it

+	 * 

+	 * @param fieldName

+	 * @return

+	 */

+	private LogicalVariable getVariableOnly(String fieldName) {

+		return fieldToLogicalVariableMap.get(fieldName);

+	}

 

-    private void updateVariable(String fieldName, LogicalVariable variable) {

-        LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

-        if (var == null) {

-            fieldToLogicalVariableMap.put(fieldName, variable);

-            nameToLogicalVariableMap.put(fieldName, variable);

-        } else if (!var.equals(variable)) {

-            // System.out.println("!!!replace variables!!!");

-            fieldToLogicalVariableMap.put(fieldName, variable);

-            nameToLogicalVariableMap.put(fieldName, variable);

-        }

-    }

+	private void updateVariable(String fieldName, LogicalVariable variable) {

+		LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

+		if (var == null) {

+			fieldToLogicalVariableMap.put(fieldName, variable);

+			nameToLogicalVariableMap.put(fieldName, variable);

+		} else if (!var.equals(variable)) {

+			fieldToLogicalVariableMap.put(fieldName, variable);

+			nameToLogicalVariableMap.put(fieldName, variable);

+		}

+	}

 

-    /**

-     * get a list of logical variables from the schema

-     * 

-     * @param schema

-     * @return

-     */

-    @Override

-    public List<LogicalVariable> getVariablesFromSchema(Schema schema) {

-        List<LogicalVariable> variables = new ArrayList<LogicalVariable>();

-        List<String> names = schema.getNames();

+	/**

+	 * get a list of logical variables from the schema

+	 * 

+	 * @param schema

+	 * @return

+	 */

+	@Override

+	public List<LogicalVariable> getVariablesFromSchema(Schema schema) {

+		List<LogicalVariable> variables = new ArrayList<LogicalVariable>();

+		List<String> names = schema.getNames();

 

-        for (String name : names)

-            variables.add(nameToLogicalVariableMap.get(name));

-        return variables;

-    }

+		for (String name : names)

+			variables.add(nameToLogicalVariableMap.get(name));

+		return variables;

+	}

 

-    /**

-     * get variable to typeinfo map

-     * 

-     * @return

-     */

-    public HashMap<LogicalVariable, TypeInfo> getVariableContext() {

-        return this.variableToType;

-    }

+	/**

+	 * get variable to typeinfo map

+	 * 

+	 * @return

+	 */

+	public HashMap<LogicalVariable, TypeInfo> getVariableContext() {

+		return this.variableToType;

+	}

 

-    /**

-     * get the number of variables

-     * s

-     * 

-     * @return

-     */

-    public int getVariableCounter() {

-        return currentVariable + 1;

-    }

+	/**

+	 * get the number of variables s

+	 * 

+	 * @return

+	 */

+	public int getVariableCounter() {

+		return currentVariable + 1;

+	}

 

-    /**

-     * translate from hive operator tree to asterix operator tree

-     * 

-     * @param hive

-     *            roots

-     * @return Algebricks roots

-     */

-    public void translate(List<Operator> hiveRoot, ILogicalOperator parentOperator,

-            HashMap<String, PartitionDesc> aliasToPathMap) throws AlgebricksException {

-        /**

-         * register visitors

-         */

-        visitors.add(new FilterVisitor());

-        visitors.add(new GroupByVisitor());

-        visitors.add(new JoinVisitor());

-        visitors.add(new LateralViewJoinVisitor());

-        visitors.add(new UnionVisitor());

-        visitors.add(new LimitVisitor());

-        visitors.add(new MapJoinVisitor());

-        visitors.add(new ProjectVisitor());

-        visitors.add(new SortVisitor());

-        visitors.add(new ExtractVisitor());

-        visitors.add(new TableScanWriteVisitor(aliasToPathMap));

+	/**

+	 * translate from hive operator tree to asterix operator tree

+	 * 

+	 * @param hive

+	 *            roots

+	 * @return Algebricks roots

+	 */

+	public void translate(List<Operator> hiveRoot,

+			ILogicalOperator parentOperator,

+			HashMap<String, PartitionDesc> aliasToPathMap)

+			throws AlgebricksException {

+		/**

+		 * register visitors

+		 */

+		visitors.add(new FilterVisitor());

+		visitors.add(new GroupByVisitor());

+		visitors.add(new JoinVisitor());

+		visitors.add(new LateralViewJoinVisitor());

+		visitors.add(new UnionVisitor());

+		visitors.add(new LimitVisitor());

+		visitors.add(new MapJoinVisitor());

+		visitors.add(new ProjectVisitor());

+		visitors.add(new SortVisitor());

+		visitors.add(new ExtractVisitor());

+		visitors.add(new TableScanWriteVisitor(aliasToPathMap));

 

-        List<Mutable<ILogicalOperator>> refList = translate(hiveRoot, new MutableObject<ILogicalOperator>(

-                parentOperator));

-        insertReplicateOperator(refList);

-        if (refList != null)

-            rootOperators.addAll(refList);

-    }

+		List<Mutable<ILogicalOperator>> refList = translate(hiveRoot,

+				new MutableObject<ILogicalOperator>(parentOperator));

+		insertReplicateOperator(refList);

+		if (refList != null)

+			rootOperators.addAll(refList);

+	}

 

-    /**

-     * translate operator DAG

-     * 

-     * @param hiveRoot

-     * @param AlgebricksParentOperator

-     * @return

-     */

-    private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,

-            Mutable<ILogicalOperator> AlgebricksParentOperator) throws AlgebricksException {

+	/**

+	 * translate operator DAG

+	 * 

+	 * @param hiveRoot

+	 * @param AlgebricksParentOperator

+	 * @return

+	 */

+	private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,

+			Mutable<ILogicalOperator> AlgebricksParentOperator)

+			throws AlgebricksException {

 

-        for (Operator hiveOperator : hiveRoot) {

-            continueTraverse = true;

-            Mutable<ILogicalOperator> currentOperatorRef = null;

-            if (hiveOperator.getType() == OperatorType.FILTER) {

-                FilterOperator fop = (FilterOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.REDUCESINK) {

-                ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.JOIN) {

-                JoinOperator fop = (JoinOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null) {

-                        continueTraverse = true;

-                        break;

-                    } else

-                        continueTraverse = false;

-                }

-                if (currentOperatorRef == null)

-                    return null;

-            } else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {

-                LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-                if (currentOperatorRef == null)

-                    return null;

-            } else if (hiveOperator.getType() == OperatorType.MAPJOIN) {

-                MapJoinOperator fop = (MapJoinOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null) {

-                        continueTraverse = true;

-                        break;

-                    } else

-                        continueTraverse = false;

-                }

-                if (currentOperatorRef == null)

-                    return null;

-            } else if (hiveOperator.getType() == OperatorType.SELECT) {

-                SelectOperator fop = (SelectOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.EXTRACT) {

-                ExtractOperator fop = (ExtractOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.GROUPBY) {

-                GroupByOperator fop = (GroupByOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.TABLESCAN) {

-                TableScanOperator fop = (TableScanOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.FILESINK) {

-                FileSinkOperator fop = (FileSinkOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.LIMIT) {

-                LimitOperator lop = (LimitOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.UDTF) {

-                UDTFOperator lop = (UDTFOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null)

-                        break;

-                }

-            } else if (hiveOperator.getType() == OperatorType.UNION) {

-                UnionOperator lop = (UnionOperator) hiveOperator;

-                for (Visitor visitor : visitors) {

-                    currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);

-                    if (currentOperatorRef != null) {

-                        continueTraverse = true;

-                        break;

-                    } else

-                        continueTraverse = false;

-                }

-            } else

-                ;

-            if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0

-                    && continueTraverse) {

-                @SuppressWarnings("unchecked")

-                List<Operator> children = hiveOperator.getChildOperators();

-                if (currentOperatorRef == null)

-                    currentOperatorRef = AlgebricksParentOperator;

-                translate(children, currentOperatorRef);

-            }

-            if (hiveOperator.getChildOperators() == null || hiveOperator.getChildOperators().size() == 0)

-                logicalOp.add(currentOperatorRef);

-        }

-        return logicalOp;

-    }

+		for (Operator hiveOperator : hiveRoot) {

+			continueTraverse = true;

+			Mutable<ILogicalOperator> currentOperatorRef = null;

+			if (hiveOperator.getType() == OperatorType.FILTER) {

+				FilterOperator fop = (FilterOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.REDUCESINK) {

+				ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.JOIN) {

+				JoinOperator fop = (JoinOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null) {

+						continueTraverse = true;

+						break;

+					} else

+						continueTraverse = false;

+				}

+				if (currentOperatorRef == null)

+					return null;

+			} else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {

+				LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+				if (currentOperatorRef == null)

+					return null;

+			} else if (hiveOperator.getType() == OperatorType.MAPJOIN) {

+				MapJoinOperator fop = (MapJoinOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null) {

+						continueTraverse = true;

+						break;

+					} else

+						continueTraverse = false;

+				}

+				if (currentOperatorRef == null)

+					return null;

+			} else if (hiveOperator.getType() == OperatorType.SELECT) {

+				SelectOperator fop = (SelectOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.EXTRACT) {

+				ExtractOperator fop = (ExtractOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.GROUPBY) {

+				GroupByOperator fop = (GroupByOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.TABLESCAN) {

+				TableScanOperator fop = (TableScanOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.FILESINK) {

+				FileSinkOperator fop = (FileSinkOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(fop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.LIMIT) {

+				LimitOperator lop = (LimitOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(lop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.UDTF) {

+				UDTFOperator lop = (UDTFOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(lop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null)

+						break;

+				}

+			} else if (hiveOperator.getType() == OperatorType.UNION) {

+				UnionOperator lop = (UnionOperator) hiveOperator;

+				for (Visitor visitor : visitors) {

+					currentOperatorRef = visitor.visit(lop,

+							AlgebricksParentOperator, this);

+					if (currentOperatorRef != null) {

+						continueTraverse = true;

+						break;

+					} else

+						continueTraverse = false;

+				}

+			} else

+				;

+			if (hiveOperator.getChildOperators() != null

+					&& hiveOperator.getChildOperators().size() > 0

+					&& continueTraverse) {

+				@SuppressWarnings("unchecked")

+				List<Operator> children = hiveOperator.getChildOperators();

+				if (currentOperatorRef == null)

+					currentOperatorRef = AlgebricksParentOperator;

+				translate(children, currentOperatorRef);

+			}

+			if (hiveOperator.getChildOperators() == null

+					|| hiveOperator.getChildOperators().size() == 0)

+				logicalOp.add(currentOperatorRef);

+		}

+		return logicalOp;

+	}

 

-    /**

-     * used in select, group by to get no-column-expression columns

-     * 

-     * @param cols

-     * @return

-     */

-    public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols,

-            ArrayList<LogicalVariable> variables) {

+	/**

+	 * used in select, group by to get no-column-expression columns

+	 * 

+	 * @param cols

+	 * @return

+	 */

+	public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent,

+			List<ExprNodeDesc> cols, ArrayList<LogicalVariable> variables) {

 

-        ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();

+		ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();

 

-        /**

-         * variables to be appended in the assign operator

-         */

-        ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();

+		/**

+		 * variables to be appended in the assign operator

+		 */

+		ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();

 

-        // one variable can only be assigned once

-        for (ExprNodeDesc hiveExpr : cols) {

-            rewriteExpression(hiveExpr);

+		// one variable can only be assigned once

+		for (ExprNodeDesc hiveExpr : cols) {

+			rewriteExpression(hiveExpr);

 

-            if (hiveExpr instanceof ExprNodeColumnDesc) {

-                ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;

-                String fieldName = desc2.getTabAlias() + "." + desc2.getColumn();

+			if (hiveExpr instanceof ExprNodeColumnDesc) {

+				ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;

+				String fieldName = desc2.getTabAlias() + "."

+						+ desc2.getColumn();

 

-                // System.out.println("project expr: " + fieldName);

+				// System.out.println("project expr: " + fieldName);

 

-                if (fieldName.indexOf("$$") < 0) {

-                    LogicalVariable var = getVariable(fieldName, hiveExpr.getTypeInfo());

-                    desc2.setColumn(var.toString());

-                    desc2.setTabAlias("");

-                    variables.add(var);

-                } else {

-                    LogicalVariable var = nameToLogicalVariableMap.get(desc2.getColumn());

-                    String name = this.logicalVariableToFieldMap.get(var);

-                    var = this.getVariableOnly(name);

-                    variables.add(var);

-                }

-            } else {

-                Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);

-                expressions.add(asterixExpr);

-                LogicalVariable var = getVariable(hiveExpr.getExprString() + asterixExpr.hashCode(),

-                        hiveExpr.getTypeInfo());

-                variables.add(var);

-                appendedVariables.add(var);

-            }

-        }

+				if (fieldName.indexOf("$$") < 0) {

+					LogicalVariable var = getVariable(fieldName,

+							hiveExpr.getTypeInfo());

+					desc2.setColumn(var.toString());

+					desc2.setTabAlias("");

+					variables.add(var);

+				} else {

+					LogicalVariable var = nameToLogicalVariableMap.get(desc2

+							.getColumn());

+					String name = this.logicalVariableToFieldMap.get(var);

+					var = this.getVariableOnly(name);

+					variables.add(var);

+				}

+			} else {

+				Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);

+				expressions.add(asterixExpr);

+				LogicalVariable var = getVariable(hiveExpr.getExprString()

+						+ asterixExpr.hashCode(), hiveExpr.getTypeInfo());

+				variables.add(var);

+				appendedVariables.add(var);

+			}

+		}

 

-        /**

-         * create an assign operator to deal with appending

-         */

-        ILogicalOperator assignOp = null;

-        if (appendedVariables.size() > 0) {

-            assignOp = new AssignOperator(appendedVariables, expressions);

-            assignOp.getInputs().add(parent);

-        }

-        return assignOp;

-    }

+		/**

+		 * create an assign operator to deal with appending

+		 */

+		ILogicalOperator assignOp = null;

+		if (appendedVariables.size() > 0) {

+			assignOp = new AssignOperator(appendedVariables, expressions);

+			assignOp.getInputs().add(parent);

+		}

+		return assignOp;

+	}

 

-    private ILogicalPlan plan;

+	private ILogicalPlan plan;

 

-    public ILogicalPlan genLogicalPlan() {

-        plan = new ALogicalPlanImpl(rootOperators);

-        return plan;

-    }

+	public ILogicalPlan genLogicalPlan() {

+		plan = new ALogicalPlanImpl(rootOperators);

+		return plan;

+	}

 

-    public void printOperators() throws AlgebricksException {

-        LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

-        StringBuilder buffer = new StringBuilder();

-        PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

-        outputWriter.println(buffer);

-        outputWriter.println("rewritten variables: ");

-        outputWriter.flush();

-        printVariables();

+	public void printOperators() throws AlgebricksException {

+		LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

+		StringBuilder buffer = new StringBuilder();

+		PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

+		outputWriter.println(buffer);

+		outputWriter.println("rewritten variables: ");

+		outputWriter.flush();

+		printVariables();

 

-    }

+	}

 

-    public static void setOutputPrinter(PrintWriter writer) {

-        outputWriter = writer;

-    }

+	public static void setOutputPrinter(PrintWriter writer) {

+		outputWriter = writer;

+	}

 

-    private void printVariables() {

-        Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap.entrySet();

+	private void printVariables() {

+		Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap

+				.entrySet();

 

-        for (Entry<String, LogicalVariable> entry : entries) {

-            outputWriter.println(entry.getKey() + " -> " + entry.getValue());

-        }

-        outputWriter.flush();

-    }

+		for (Entry<String, LogicalVariable> entry : entries) {

+			outputWriter.println(entry.getKey() + " -> " + entry.getValue());

+		}

+		outputWriter.flush();

+	}

 

-    /**

-     * generate the object inspector for the output of an operator

-     * 

-     * @param operator

-     *            The Hive operator

-     * @return an ObjectInspector object

-     */

-    public Schema generateInputSchema(Operator operator) {

-        List<String> variableNames = new ArrayList<String>();

-        List<TypeInfo> typeList = new ArrayList<TypeInfo>();

-        List<ColumnInfo> columns = operator.getSchema().getSignature();

+	/**

+	 * generate the object inspector for the output of an operator

+	 * 

+	 * @param operator

+	 *            The Hive operator

+	 * @return an ObjectInspector object

+	 */

+	public Schema generateInputSchema(Operator operator) {

+		List<String> variableNames = new ArrayList<String>();

+		List<TypeInfo> typeList = new ArrayList<TypeInfo>();

+		List<ColumnInfo> columns = operator.getSchema().getSignature();

 

-        for (ColumnInfo col : columns) {

-            // typeList.add();

-            TypeInfo type = col.getType();

-            typeList.add(type);

+		for (ColumnInfo col : columns) {

+			// typeList.add();

+			TypeInfo type = col.getType();

+			typeList.add(type);

 

-            String fieldName = col.getInternalName();

-            variableNames.add(fieldName);

-        }

+			String fieldName = col.getInternalName();

+			variableNames.add(fieldName);

+		}

 

-        return new Schema(variableNames, typeList);

-    }

+		return new Schema(variableNames, typeList);

+	}

 

-    /**

-     * rewrite the names of output columns for feature expression evaluators to

-     * use

-     * 

-     * @param operator

-     */

-    public void rewriteOperatorOutputSchema(Operator operator) {

-        List<ColumnInfo> columns = operator.getSchema().getSignature();

+	/**

+	 * rewrite the names of output columns for feature expression evaluators to

+	 * use

+	 * 

+	 * @param operator

+	 */

+	public void rewriteOperatorOutputSchema(Operator operator) {

+		List<ColumnInfo> columns = operator.getSchema().getSignature();

 

-        for (ColumnInfo column : columns) {

-            String columnName = column.getTabAlias() + "." + column.getInternalName();

-            if (columnName.indexOf("$$") < 0) {

-                LogicalVariable var = getVariable(columnName, column.getType());

-                column.setInternalName(var.toString());

-            }

-        }

-    }

+		for (ColumnInfo column : columns) {

+			String columnName = column.getTabAlias() + "."

+					+ column.getInternalName();

+			if (columnName.indexOf("$$") < 0) {

+				LogicalVariable var = getVariable(columnName, column.getType());

+				column.setInternalName(var.toString());

+			}

+		}

+	}

 

-    @Override

-    public void rewriteOperatorOutputSchema(List<LogicalVariable> variables, Operator operator) {

+	@Override

+	public void rewriteOperatorOutputSchema(List<LogicalVariable> variables,

+			Operator operator) {

 

-        printOperatorSchema(operator);

-        List<ColumnInfo> columns = operator.getSchema().getSignature();

-        if (variables.size() != columns.size()) {

-            throw new IllegalStateException("output cardinality error " + operator.getName() + " variable size: "

-                    + variables.size() + " expected " + columns.size());

-        }

+		//printOperatorSchema(operator);

+		List<ColumnInfo> columns = operator.getSchema().getSignature();

+		if (variables.size() != columns.size()) {

+			throw new IllegalStateException("output cardinality error "

+					+ operator.getName() + " variable size: "

+					+ variables.size() + " expected " + columns.size());

+		}

 

-        for (int i = 0; i < variables.size(); i++) {

-            LogicalVariable var = variables.get(i);

-            ColumnInfo column = columns.get(i);

-            String fieldName = column.getTabAlias() + "." + column.getInternalName();

-            if (fieldName.indexOf("$$") < 0) {

-                updateVariable(fieldName, var);

-                column.setInternalName(var.toString());

-            }

-        }

-        printOperatorSchema(operator);

-    }

+		for (int i = 0; i < variables.size(); i++) {

+			LogicalVariable var = variables.get(i);

+			ColumnInfo column = columns.get(i);

+			String fieldName = column.getTabAlias() + "."

+					+ column.getInternalName();

+			if (fieldName.indexOf("$$") < 0) {

+				updateVariable(fieldName, var);

+				column.setInternalName(var.toString());

+			}

+		}

+		//printOperatorSchema(operator);

+	}

 

-    /**

-     * rewrite an expression and substitute variables

-     * 

-     * @param expr

-     *            hive expression

-     */

-    public void rewriteExpression(ExprNodeDesc expr) {

-        if (expr instanceof ExprNodeColumnDesc) {

-            ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

-            String fieldName = desc.getTabAlias() + "." + desc.getColumn();

-            if (fieldName.indexOf("$$") < 0) {

-                LogicalVariable var = getVariableOnly(fieldName);

-                if (var == null) {

-                    fieldName = "." + desc.getColumn();

-                    var = getVariableOnly(fieldName);

-                    if (var == null) {

-                        fieldName = "null." + desc.getColumn();

-                        var = getVariableOnly(fieldName);

-                        if (var == null) {

-                            throw new IllegalStateException(fieldName + " is wrong!!! ");

-                        }

-                    }

-                }

-                String name = this.logicalVariableToFieldMap.get(var);

-                var = getVariableOnly(name);

-                desc.setColumn(var.toString());

-            }

-        } else {

-            if (expr.getChildren() != null && expr.getChildren().size() > 0) {

-                List<ExprNodeDesc> children = expr.getChildren();

-                for (ExprNodeDesc desc : children)

-                    rewriteExpression(desc);

-            }

-        }

-    }

+	/**

+	 * rewrite an expression and substitute variables

+	 * 

+	 * @param expr

+	 *            hive expression

+	 */

+	public void rewriteExpression(ExprNodeDesc expr) {

+		if (expr instanceof ExprNodeColumnDesc) {

+			ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

+			String fieldName = desc.getTabAlias() + "." + desc.getColumn();

+			if (fieldName.indexOf("$$") < 0) {

+				LogicalVariable var = getVariableOnly(fieldName);

+				if (var == null) {

+					fieldName = "." + desc.getColumn();

+					var = getVariableOnly(fieldName);

+					if (var == null) {

+						fieldName = "null." + desc.getColumn();

+						var = getVariableOnly(fieldName);

+						if (var == null) {

+							throw new IllegalStateException(fieldName

+									+ " is wrong!!! ");

+						}

+					}

+				}

+				String name = this.logicalVariableToFieldMap.get(var);

+				var = getVariableOnly(name);

+				desc.setColumn(var.toString());

+			}

+		} else {

+			if (expr.getChildren() != null && expr.getChildren().size() > 0) {

+				List<ExprNodeDesc> children = expr.getChildren();

+				for (ExprNodeDesc desc : children)

+					rewriteExpression(desc);

+			}

+		}

+	}

 

-    /**

-     * rewrite an expression and substitute variables

-     * 

-     * @param expr

-     *            hive expression

-     */

-    public void rewriteExpressionPartial(ExprNodeDesc expr) {

-        if (expr instanceof ExprNodeColumnDesc) {

-            ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

-            String fieldName = desc.getTabAlias() + "." + desc.getColumn();

-            if (fieldName.indexOf("$$") < 0) {

-                LogicalVariable var = getVariableOnly(fieldName);

-                desc.setColumn(var.toString());

-            }

-        } else {

-            if (expr.getChildren() != null && expr.getChildren().size() > 0) {

-                List<ExprNodeDesc> children = expr.getChildren();

-                for (ExprNodeDesc desc : children)

-                    rewriteExpressionPartial(desc);

-            }

-        }

-    }

+	/**

+	 * rewrite an expression and substitute variables

+	 * 

+	 * @param expr

+	 *            hive expression

+	 */

+	public void rewriteExpressionPartial(ExprNodeDesc expr) {

+		if (expr instanceof ExprNodeColumnDesc) {

+			ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

+			String fieldName = desc.getTabAlias() + "." + desc.getColumn();

+			if (fieldName.indexOf("$$") < 0) {

+				LogicalVariable var = getVariableOnly(fieldName);

+				desc.setColumn(var.toString());

+			}

+		} else {

+			if (expr.getChildren() != null && expr.getChildren().size() > 0) {

+				List<ExprNodeDesc> children = expr.getChildren();

+				for (ExprNodeDesc desc : children)

+					rewriteExpressionPartial(desc);

+			}

+		}

+	}

 

-    private void printOperatorSchema(Operator operator) {

-        System.out.println(operator.getName());

-        List<ColumnInfo> columns = operator.getSchema().getSignature();

-        for (ColumnInfo column : columns) {

-            System.out.print(column.getTabAlias() + "." + column.getInternalName() + "  ");

-        }

-        System.out.println();

-    }

+	// private void printOperatorSchema(Operator operator) {

+	// // System.out.println(operator.getName());

+	// // List<ColumnInfo> columns = operator.getSchema().getSignature();

+	// // for (ColumnInfo column : columns) {

+	// // System.out.print(column.getTabAlias() + "." +

+	// // column.getInternalName() + "  ");

+	// // }

+	// // System.out.println();

+	// }

 

-    /**

-     * translate scalar function expression

-     * 

-     * @param hiveExpr

-     * @return

-     */

-    public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc hiveExpr) {

-        ILogicalExpression AlgebricksExpr;

+	/**

+	 * translate scalar function expression

+	 * 

+	 * @param hiveExpr

+	 * @return

+	 */

+	public Mutable<ILogicalExpression> translateScalarFucntion(

+			ExprNodeDesc hiveExpr) {

+		ILogicalExpression AlgebricksExpr;

 

-        if (hiveExpr instanceof ExprNodeGenericFuncDesc) {

-            List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

-            List<ExprNodeDesc> children = hiveExpr.getChildren();

+		if (hiveExpr instanceof ExprNodeGenericFuncDesc) {

+			List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

+			List<ExprNodeDesc> children = hiveExpr.getChildren();

 

-            for (ExprNodeDesc child : children)

-                arguments.add(translateScalarFucntion(child));

+			for (ExprNodeDesc child : children)

+				arguments.add(translateScalarFucntion(child));

 

-            ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;

-            GenericUDF genericUdf = funcExpr.getGenericUDF();

-            UDF udf = null;

-            if (genericUdf instanceof GenericUDFBridge) {

-                GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;

-                try {

-                    udf = bridge.getUdfClass().newInstance();

-                } catch (Exception e) {

-                    e.printStackTrace();

-                }

-            }

+			ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;

+			GenericUDF genericUdf = funcExpr.getGenericUDF();

+			UDF udf = null;

+			if (genericUdf instanceof GenericUDFBridge) {

+				GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;

+				try {

+					udf = bridge.getUdfClass().newInstance();

+				} catch (Exception e) {

+					e.printStackTrace();

+				}

+			}

 

-            /**

-             * set up the hive function

-             */

-            Object hiveFunction = genericUdf;

-            if (udf != null)

-                hiveFunction = udf;

+			/**

+			 * set up the hive function

+			 */

+			Object hiveFunction = genericUdf;

+			if (udf != null)

+				hiveFunction = udf;

 

-            FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE.getAlgebricksFunctionId(hiveFunction

-                    .getClass());

-            if (funcId == null) {

-                funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, hiveFunction.getClass().getName());

-            }

+			FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE

+					.getAlgebricksFunctionId(hiveFunction.getClass());

+			if (funcId == null) {

+				funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,

+						hiveFunction.getClass().getName());

+			}

 

-            Object functionInfo = null;

-            if (genericUdf instanceof GenericUDFBridge) {

-                functionInfo = funcExpr;

-            }

+			Object functionInfo = null;

+			if (genericUdf instanceof GenericUDFBridge) {

+				functionInfo = funcExpr;

+			}

 

-            /**

-             * generate the function call expression

-             */

-            ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(

-                    funcId, functionInfo), arguments);

-            AlgebricksExpr = AlgebricksFuncExpr;

+			/**

+			 * generate the function call expression

+			 */

+			ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(

+					new HiveFunctionInfo(funcId, functionInfo), arguments);

+			AlgebricksExpr = AlgebricksFuncExpr;

 

-        } else if (hiveExpr instanceof ExprNodeColumnDesc) {

-            ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;

-            LogicalVariable var = this.getVariable(column.getColumn());

-            AlgebricksExpr = new VariableReferenceExpression(var);

+		} else if (hiveExpr instanceof ExprNodeColumnDesc) {

+			ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;

+			LogicalVariable var = this.getVariable(column.getColumn());

+			AlgebricksExpr = new VariableReferenceExpression(var);

 

-        } else if (hiveExpr instanceof ExprNodeFieldDesc) {

-            FunctionIdentifier funcId;

-            funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.FIELDACCESS);

+		} else if (hiveExpr instanceof ExprNodeFieldDesc) {

+			FunctionIdentifier funcId;

+			funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,

+					ExpressionConstant.FIELDACCESS);

 

-            ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(

-                    funcId, hiveExpr));

-            AlgebricksExpr = AlgebricksFuncExpr;

-        } else if (hiveExpr instanceof ExprNodeConstantDesc) {

-            ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;

-            Object value = hiveConst.getValue();

-            AlgebricksExpr = new ConstantExpression(new HivesterixConstantValue(value));

-        } else if (hiveExpr instanceof ExprNodeNullDesc) {

-            FunctionIdentifier funcId;

-            funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.NULL);

+			ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(

+					new HiveFunctionInfo(funcId, hiveExpr));

+			AlgebricksExpr = AlgebricksFuncExpr;

+		} else if (hiveExpr instanceof ExprNodeConstantDesc) {

+			ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;

+			Object value = hiveConst.getValue();

+			AlgebricksExpr = new ConstantExpression(

+					new HivesterixConstantValue(value));

+		} else if (hiveExpr instanceof ExprNodeNullDesc) {

+			FunctionIdentifier funcId;

+			funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,

+					ExpressionConstant.NULL);

 

-            ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(

-                    funcId, hiveExpr));

+			ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(

+					new HiveFunctionInfo(funcId, hiveExpr));

 

-            AlgebricksExpr = AlgebricksFuncExpr;

-        } else {

-            throw new IllegalStateException("unknown hive expression");

-        }

-        return new MutableObject<ILogicalExpression>(AlgebricksExpr);

-    }

+			AlgebricksExpr = AlgebricksFuncExpr;

+		} else {

+			throw new IllegalStateException("unknown hive expression");

+		}

+		return new MutableObject<ILogicalExpression>(AlgebricksExpr);

+	}

 

-    /**

-     * translate aggregation function expression

-     * 

-     * @param aggregateDesc

-     * @return

-     */

-    public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc) {

+	/**

+	 * translate aggregation function expression

+	 * 

+	 * @param aggregateDesc

+	 * @return

+	 */

+	public Mutable<ILogicalExpression> translateAggregation(

+			AggregationDesc aggregateDesc) {

 

-        String UDAFName = aggregateDesc.getGenericUDAFName();

+		String UDAFName = aggregateDesc.getGenericUDAFName();

 

-        List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

-        List<ExprNodeDesc> children = aggregateDesc.getParameters();

+		List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

+		List<ExprNodeDesc> children = aggregateDesc.getParameters();

 

-        for (ExprNodeDesc child : children)

-            arguments.add(translateScalarFucntion(child));

+		for (ExprNodeDesc child : children)

+			arguments.add(translateScalarFucntion(child));

 

-        FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDAFName + "("

-                + aggregateDesc.getMode() + ")");

-        HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);

-        AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(funcInfo, false,

-                arguments);

-        return new MutableObject<ILogicalExpression>(aggregationExpression);

-    }

+		FunctionIdentifier funcId = new FunctionIdentifier(

+				ExpressionConstant.NAMESPACE, UDAFName + "("

+						+ aggregateDesc.getMode() + ")");

+		HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);

+		AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(

+				funcInfo, false, arguments);

+		return new MutableObject<ILogicalExpression>(aggregationExpression);

+	}

 

-    /**

-     * translate aggregation function expression

-     * 

-     * @param aggregator

-     * @return

-     */

-    public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {

+	/**

+	 * translate aggregation function expression

+	 * 

+	 * @param aggregator

+	 * @return

+	 */

+	public Mutable<ILogicalExpression> translateUnnestFunction(

+			UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {

 

-        String UDTFName = udtfDesc.getUDTFName();

+		String UDTFName = udtfDesc.getUDTFName();

 

-        FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDTFName);

-        UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(new HiveFunctionInfo(

-                funcId, udtfDesc));

-        unnestingExpression.getArguments().add(argument);

-        return new MutableObject<ILogicalExpression>(unnestingExpression);

-    }

+		FunctionIdentifier funcId = new FunctionIdentifier(

+				ExpressionConstant.NAMESPACE, UDTFName);

+		UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(

+				new HiveFunctionInfo(funcId, udtfDesc));

+		unnestingExpression.getArguments().add(argument);

+		return new MutableObject<ILogicalExpression>(unnestingExpression);

+	}

 

-    /**

-     * get typeinfo

-     */

-    @Override

-    public TypeInfo getType(LogicalVariable var) {

-        return variableToType.get(var);

-    }

+	/**

+	 * get typeinfo

+	 */

+	@Override

+	public TypeInfo getType(LogicalVariable var) {

+		return variableToType.get(var);

+	}

 

-    /**

-     * get variable from variable name

-     */

-    @Override

-    public LogicalVariable getVariable(String name) {

-        return nameToLogicalVariableMap.get(name);

-    }

+	/**

+	 * get variable from variable name

+	 */

+	@Override

+	public LogicalVariable getVariable(String name) {

+		return nameToLogicalVariableMap.get(name);

+	}

 

-    @Override

-    public LogicalVariable getVariableFromFieldName(String fieldName) {

-        return this.getVariableOnly(fieldName);

-    }

+	@Override

+	public LogicalVariable getVariableFromFieldName(String fieldName) {

+		return this.getVariableOnly(fieldName);

+	}

 

-    /**

-     * set the metadata provider

-     */

-    @Override

-    public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata) {

-        this.metaData = metadata;

-    }

+	/**

+	 * set the metadata provider

+	 */

+	@Override

+	public void setMetadataProvider(

+			IMetadataProvider<PartitionDesc, Object> metadata) {

+		this.metaData = metadata;

+	}

 

-    /**

-     * insert ReplicateOperator when necessary

-     */

-    private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {

-        Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();

-        buildChildToParentsMapping(roots, childToParentsMap);

-        for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap.entrySet()) {

-            List<Mutable<ILogicalOperator>> pList = entry.getValue();

-            if (pList.size() > 1) {

-                ILogicalOperator rop = new ReplicateOperator(pList.size());

-                Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);

-                Mutable<ILogicalOperator> childRef = entry.getKey();

-                rop.getInputs().add(childRef);

-                for (Mutable<ILogicalOperator> parentRef : pList) {

-                    ILogicalOperator parentOp = parentRef.getValue();

-                    int index = parentOp.getInputs().indexOf(childRef);

-                    parentOp.getInputs().set(index, ropRef);

-                }

-            }

-        }

-    }

+	/**

+	 * insert ReplicateOperator when necessary

+	 */

+	private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {

+		Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();

+		buildChildToParentsMapping(roots, childToParentsMap);

+		for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap

+				.entrySet()) {

+			List<Mutable<ILogicalOperator>> pList = entry.getValue();

+			if (pList.size() > 1) {

+				ILogicalOperator rop = new ReplicateOperator(pList.size());

+				Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(

+						rop);

+				Mutable<ILogicalOperator> childRef = entry.getKey();

+				rop.getInputs().add(childRef);

+				for (Mutable<ILogicalOperator> parentRef : pList) {

+					ILogicalOperator parentOp = parentRef.getValue();

+					int index = parentOp.getInputs().indexOf(childRef);

+					parentOp.getInputs().set(index, ropRef);

+				}

+			}

+		}

+	}

 

-    /**

-     * build the mapping from child to Parents

-     * 

-     * @param roots

-     * @param childToParentsMap

-     */

-    private void buildChildToParentsMapping(List<Mutable<ILogicalOperator>> roots,

-            Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {

-        for (Mutable<ILogicalOperator> opRef : roots) {

-            List<Mutable<ILogicalOperator>> childRefs = opRef.getValue().getInputs();

-            for (Mutable<ILogicalOperator> childRef : childRefs) {

-                List<Mutable<ILogicalOperator>> parentList = map.get(childRef);

-                if (parentList == null) {

-                    parentList = new ArrayList<Mutable<ILogicalOperator>>();

-                    map.put(childRef, parentList);

-                }

-                if (!parentList.contains(opRef))

-                    parentList.add(opRef);

-            }

-            buildChildToParentsMapping(childRefs, map);

-        }

-    }

+	/**

+	 * build the mapping from child to Parents

+	 * 

+	 * @param roots

+	 * @param childToParentsMap

+	 */

+	private void buildChildToParentsMapping(

+			List<Mutable<ILogicalOperator>> roots,

+			Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {

+		for (Mutable<ILogicalOperator> opRef : roots) {

+			List<Mutable<ILogicalOperator>> childRefs = opRef.getValue()

+					.getInputs();

+			for (Mutable<ILogicalOperator> childRef : childRefs) {

+				List<Mutable<ILogicalOperator>> parentList = map.get(childRef);

+				if (parentList == null) {

+					parentList = new ArrayList<Mutable<ILogicalOperator>>();

+					map.put(childRef, parentList);

+				}

+				if (!parentList.contains(opRef))

+					parentList.add(opRef);

+			}

+			buildChildToParentsMapping(childRefs, map);

+		}

+	}

 }

diff --git a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index 6c1ac72..ed5ab70 100644
--- a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -74,8 +74,6 @@
 

     private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());

 

-    // private static final String[] locConstraints = {}

-

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

     static {

@@ -273,7 +271,6 @@
              * list of map-reduce tasks

              */

             Task<? extends Serializable> task = rootTasks.get(i);

-            // System.out.println("!" + task.getName());

 

             if (task instanceof MapRedTask) {

                 List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);

diff --git a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
index d248486..9e62c73 100644
--- a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
+++ b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
@@ -110,7 +110,7 @@
 			if (returnedSplits[i] != null)
 				splits[i] = returnedSplits[i];
 
-		System.out.println("!!! number of splits: " + splits.length);
+		System.out.println("number of splits: " + splits.length);
 		constraintsByHostNames = new String[splits.length];
 		for (int i = 0; i < splits.length; i++) {
 			try {
diff --git a/hyracks/hyracks-api/pom.xml b/hyracks/hyracks-api/pom.xml
index 72f0d8b..7778b31 100644
--- a/hyracks/hyracks-api/pom.xml
+++ b/hyracks/hyracks-api/pom.xml
@@ -17,6 +17,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index f36b7b3..0eac9a2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -17,6 +17,7 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.rewriter.ActivityClusterGraphRewriter;
 
 public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
     private static final long serialVersionUID = 1L;
@@ -78,6 +79,8 @@
         return new IActivityClusterGraphGenerator() {
             @Override
             public ActivityClusterGraph initialize() {
+                ActivityClusterGraphRewriter rewriter = new ActivityClusterGraphRewriter();
+                rewriter.rewrite(acg);
                 return acg;
             }
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
index 6698ff7..9fb2b08 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
-public final class ActivityCluster implements Serializable {
+public class ActivityCluster implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final ActivityClusterGraph acg;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
new file mode 100644
index 0000000..c6761e9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.runtime.SuperActivity;
+
+/**
+ * This class rewrite the AcivityClusterGraph to eliminate
+ * all one-to-one connections and merge one-to-one connected
+ * DAGs into super activities.
+ * </p>
+ * Each super activity internally maintains a DAG and execute it at the runtime.
+ * 
+ * @author yingyib
+ */
+public class ActivityClusterGraphRewriter {
+    private static String ONE_TO_ONE_CONNECTOR = "edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor";
+
+    /**
+     * rewrite an activity cluster graph to eliminate
+     * all one-to-one connections and merge one-to-one connected
+     * DAGs into super activities.
+     * 
+     * @param acg
+     *            the activity cluster graph
+     */
+    public void rewrite(ActivityClusterGraph acg) {
+        acg.getActivityMap().clear();
+        acg.getConnectorMap().clear();
+        Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
+        for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+            rewriteIntraActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
+        }
+        for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+            rewriteInterActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
+        }
+        invertedActivitySuperActivityMap.clear();
+    }
+
+    /**
+     * rewrite the blocking relationship among activity cluster
+     * 
+     * @param ac
+     *            the activity cluster to be rewritten
+     */
+    private void rewriteInterActivityCluster(ActivityCluster ac,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
+        Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
+        Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
+        for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
+            invertedAid2SuperAidMap.put(entry.getKey().getActivityId(), entry.getValue().getActivityId());
+        }
+        Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+        for (Entry<ActivityId, Set<ActivityId>> entry : blocked2BlockerMap.entrySet()) {
+            ActivityId blocked = entry.getKey();
+            ActivityId replacedBlocked = invertedAid2SuperAidMap.get(blocked);
+            Set<ActivityId> blockers = entry.getValue();
+            Set<ActivityId> replacedBlockers = null;
+            if (blockers != null) {
+                replacedBlockers = new HashSet<ActivityId>();
+                for (ActivityId blocker : blockers) {
+                    replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
+                    ActivityCluster dependingAc = ac.getActivityClusterGraph().getActivityMap()
+                            .get(invertedAid2SuperAidMap.get(blocker));
+                    if (!ac.getDependencies().contains(dependingAc)) {
+                        ac.getDependencies().add(dependingAc);
+                    }
+                }
+            }
+            if (replacedBlockers != null) {
+                Set<ActivityId> existingBlockers = replacedBlocked2BlockerMap.get(replacedBlocked);
+                if (existingBlockers == null) {
+                    replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+                } else {
+                    existingBlockers.addAll(replacedBlockers);
+                    replacedBlocked2BlockerMap.put(replacedBlocked, existingBlockers);
+                }
+            }
+        }
+        blocked2BlockerMap.clear();
+        blocked2BlockerMap.putAll(replacedBlocked2BlockerMap);
+    }
+
+    /**
+     * rewrite an activity cluster internally
+     * 
+     * @param ac
+     *            the activity cluster to be rewritten
+     */
+    private void rewriteIntraActivityCluster(ActivityCluster ac,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
+        Map<ActivityId, IActivity> activities = ac.getActivityMap();
+        Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
+        Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
+        Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap = ac
+                .getConnectorActivityMap();
+        ActivityClusterGraph acg = ac.getActivityClusterGraph();
+        Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        Map<ActivityId, SuperActivity> superActivities = new HashMap<ActivityId, SuperActivity>();
+        Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<ActivityId, Queue<IActivity>>();
+
+        /**
+         * Build the initial super activities
+         */
+        for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+            ActivityId activityId = entry.getKey();
+            IActivity activity = entry.getValue();
+            if (activityInputMap.get(activityId) == null) {
+                startActivities.put(activityId, activity);
+                /**
+                 * use the start activity's id as the id of the super activity
+                 */
+                createNewSuperActivity(ac, superActivities, toBeExpendedMap, invertedActivitySuperActivityMap,
+                        activityId, activity);
+            }
+        }
+
+        /**
+         * expand one-to-one connected activity cluster by the BFS order.
+         * after the while-loop, the original activities are partitioned
+         * into equivalent classes, one-per-super-activity.
+         */
+        Map<ActivityId, SuperActivity> clonedSuperActivities = new HashMap<ActivityId, SuperActivity>();
+        while (toBeExpendedMap.size() > 0) {
+            clonedSuperActivities.clear();
+            clonedSuperActivities.putAll(superActivities);
+            for (Entry<ActivityId, SuperActivity> entry : clonedSuperActivities.entrySet()) {
+                ActivityId superActivityId = entry.getKey();
+                SuperActivity superActivity = entry.getValue();
+
+                /**
+                 * for the case where the super activity has already been swallowed
+                 */
+                if (superActivities.get(superActivityId) == null) {
+                    continue;
+                }
+
+                /**
+                 * expend the super activity
+                 */
+                Queue<IActivity> toBeExpended = toBeExpendedMap.get(superActivityId);
+                if (toBeExpended == null) {
+                    /**
+                     * Nothing to expand
+                     */
+                    continue;
+                }
+                IActivity expendingActivity = toBeExpended.poll();
+                List<IConnectorDescriptor> outputConnectors = activityOutputMap.get(expendingActivity.getActivityId());
+                if (outputConnectors != null) {
+                    for (IConnectorDescriptor outputConn : outputConnectors) {
+                        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = connectorActivityMap
+                                .get(outputConn.getConnectorId());
+                        IActivity newActivity = endPoints.getRight().getLeft();
+                        SuperActivity existingSuperActivity = invertedActivitySuperActivityMap.get(newActivity);
+                        if (outputConn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
+                            /**
+                             * expend the super activity cluster on an one-to-one out-bound connection
+                             */
+                            if (existingSuperActivity == null) {
+                                superActivity.addActivity(newActivity);
+                                toBeExpended.add(newActivity);
+                                invertedActivitySuperActivityMap.put(newActivity, superActivity);
+                            } else {
+                                /**
+                                 * the two activities already in the same super activity
+                                 */
+                                if (existingSuperActivity == superActivity) {
+                                    continue;
+                                }
+                                /**
+                                 * swallow an existing super activity
+                                 */
+                                swallowExistingSuperActivity(superActivities, toBeExpendedMap,
+                                        invertedActivitySuperActivityMap, superActivity, superActivityId,
+                                        existingSuperActivity);
+                            }
+                        } else {
+                            if (existingSuperActivity == null) {
+                                /**
+                                 * create new activity
+                                 */
+                                createNewSuperActivity(ac, superActivities, toBeExpendedMap,
+                                        invertedActivitySuperActivityMap, newActivity.getActivityId(), newActivity);
+                            }
+                        }
+                    }
+                }
+
+                /**
+                 * remove the to-be-expended queue if it is empty
+                 */
+                if (toBeExpended.size() == 0) {
+                    toBeExpendedMap.remove(superActivityId);
+                }
+            }
+        }
+
+        Map<ConnectorDescriptorId, IConnectorDescriptor> connMap = ac.getConnectorMap();
+        Map<ConnectorDescriptorId, RecordDescriptor> connRecordDesc = ac.getConnectorRecordDescriptorMap();
+        Map<SuperActivity, Integer> superActivityProducerPort = new HashMap<SuperActivity, Integer>();
+        Map<SuperActivity, Integer> superActivityConsumerPort = new HashMap<SuperActivity, Integer>();
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            superActivityProducerPort.put(entry.getValue(), 0);
+            superActivityConsumerPort.put(entry.getValue(), 0);
+        }
+
+        /**
+         * create a new activity cluster to replace the old activity cluster
+         */
+        ActivityCluster newActivityCluster = new ActivityCluster(acg, ac.getId());
+        newActivityCluster.setConnectorPolicyAssignmentPolicy(ac.getConnectorPolicyAssignmentPolicy());
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            newActivityCluster.addActivity(entry.getValue());
+            acg.getActivityMap().put(entry.getKey(), newActivityCluster);
+        }
+
+        /**
+         * Setup connectors: either inside a super activity or among super activities
+         */
+        for (Entry<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> entry : connectorActivityMap
+                .entrySet()) {
+            ConnectorDescriptorId connectorId = entry.getKey();
+            Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = entry.getValue();
+            IActivity producerActivity = endPoints.getLeft().getLeft();
+            IActivity consumerActivity = endPoints.getRight().getLeft();
+            int producerPort = endPoints.getLeft().getRight();
+            int consumerPort = endPoints.getRight().getRight();
+            RecordDescriptor recordDescriptor = connRecordDesc.get(connectorId);
+            IConnectorDescriptor conn = connMap.get(connectorId);
+            if (conn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
+                /**
+                 * connection edge between inner activities
+                 */
+                SuperActivity residingSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+                residingSuperActivity.connect(conn, producerActivity, producerPort, consumerActivity, consumerPort,
+                        recordDescriptor);
+            } else {
+                /**
+                 * connection edge between super activities
+                 */
+                SuperActivity producerSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+                SuperActivity consumerSuperActivity = invertedActivitySuperActivityMap.get(consumerActivity);
+                int producerSAPort = superActivityProducerPort.get(producerSuperActivity);
+                int consumerSAPort = superActivityConsumerPort.get(consumerSuperActivity);
+                newActivityCluster.addConnector(conn);
+                newActivityCluster.connect(conn, producerSuperActivity, producerSAPort, consumerSuperActivity,
+                        consumerSAPort, recordDescriptor);
+
+                /**
+                 * bridge the port
+                 */
+                producerSuperActivity.setClusterOutputIndex(producerSAPort, producerActivity.getActivityId(),
+                        producerPort);
+                consumerSuperActivity.setClusterInputIndex(consumerSAPort, consumerActivity.getActivityId(),
+                        consumerPort);
+                acg.getConnectorMap().put(connectorId, newActivityCluster);
+
+                /**
+                 * increasing the port number for the producer and consumer
+                 */
+                superActivityProducerPort.put(producerSuperActivity, ++producerSAPort);
+                superActivityConsumerPort.put(consumerSuperActivity, ++consumerSAPort);
+            }
+        }
+
+        /**
+         * Set up the roots of the new activity cluster
+         */
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            List<IConnectorDescriptor> connIds = newActivityCluster.getActivityOutputMap().get(entry.getKey());
+            if (connIds == null || connIds.size() == 0) {
+                newActivityCluster.addRoot(entry.getValue());
+            }
+        }
+
+        /**
+         * set up the blocked2Blocker mapping, which will be updated in the rewriteInterActivityCluster call
+         */
+        newActivityCluster.getBlocked2BlockerMap().putAll(ac.getBlocked2BlockerMap());
+
+        /**
+         * replace the old activity cluster with the new activity cluster
+         */
+        acg.getActivityClusterMap().put(ac.getId(), newActivityCluster);
+    }
+
+    /**
+     * Create a new super activity
+     * 
+     * @param acg
+     *            the activity cluster
+     * @param superActivities
+     *            the map from activity id to current super activities
+     * @param toBeExpendedMap
+     *            the map from an existing super activity to its BFS expansion queue of the original activities
+     * @param invertedActivitySuperActivityMap
+     *            the map from the original activities to their hosted super activities
+     * @param activityId
+     *            the activity id for the new super activity, which is the first added acitivty's id in the super activity
+     * @param activity
+     *            the first activity added to the new super activity
+     */
+    private void createNewSuperActivity(ActivityCluster acg, Map<ActivityId, SuperActivity> superActivities,
+            Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, ActivityId activityId, IActivity activity) {
+        SuperActivity superActivity = new SuperActivity(acg.getActivityClusterGraph(), acg.getId(), activityId);
+        superActivities.put(activityId, superActivity);
+        superActivity.addActivity(activity);
+        Queue<IActivity> toBeExpended = new LinkedList<IActivity>();
+        toBeExpended.add(activity);
+        toBeExpendedMap.put(activityId, toBeExpended);
+        invertedActivitySuperActivityMap.put(activity, superActivity);
+    }
+
+    /**
+     * One super activity swallows another existing super activity.
+     * 
+     * @param superActivities
+     *            the map from activity id to current super activities
+     * @param toBeExpendedMap
+     *            the map from an existing super activity to its BFS expansion queue of the original activities
+     * @param invertedActivitySuperActivityMap
+     *            the map from the original activities to their hosted super activities
+     * @param superActivity
+     *            the "swallowing" super activity
+     * @param superActivityId
+     *            the activity id for the "swallowing" super activity, which is also the first added acitivty's id in the super activity
+     * @param existingSuperActivity
+     *            an existing super activity which is to be swallowed by the "swallowing" super activity
+     */
+    private void swallowExistingSuperActivity(Map<ActivityId, SuperActivity> superActivities,
+            Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, SuperActivity superActivity,
+            ActivityId superActivityId, SuperActivity existingSuperActivity) {
+        ActivityId existingSuperActivityId = existingSuperActivity.getActivityId();
+        superActivities.remove(existingSuperActivityId);
+        for (Entry<ActivityId, IActivity> existingEntry : existingSuperActivity.getActivityMap().entrySet()) {
+            IActivity existingActivity = existingEntry.getValue();
+            superActivity.addActivity(existingActivity);
+            invertedActivitySuperActivityMap.put(existingActivity, superActivity);
+        }
+        Queue<IActivity> tbeQueue = toBeExpendedMap.get(superActivityId);
+        Queue<IActivity> existingTbeQueque = toBeExpendedMap.remove(existingSuperActivityId);
+        if (existingTbeQueque != null) {
+            tbeQueue.addAll(existingTbeQueque);
+        }
+    }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
new file mode 100644
index 0000000..07b7ffc
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+
+/**
+ * All the connectors in an OneToOneConnectedCluster are OneToOneConnectorDescriptors.
+ * 
+ * @author yingyib
+ */
+public class OneToOneConnectedActivityCluster extends ActivityCluster {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final Map<Integer, Pair<ActivityId, Integer>> clusterInputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+    protected final Map<Integer, Pair<ActivityId, Integer>> clusterOutputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+    protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterOutputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+    protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterInputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+
+    public OneToOneConnectedActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+        super(acg, id);
+    }
+
+    /**
+     * Set up the mapping of the cluster's output channel to an internal activity and its output channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @param activityId
+     *            the id of the internal activity which produces the corresponding output
+     * @param activityOutputIndex
+     *            the output channel index of the internal activity which corresponds to the output channel of the cluster of activities
+     */
+    public void setClusterOutputIndex(int clusterOutputIndex, ActivityId activityId, int activityOutputIndex) {
+        clusterOutputIndexMap.put(clusterOutputIndex, Pair.of(activityId, activityOutputIndex));
+        invertedClusterOutputIndexMap.put(Pair.of(activityId, activityOutputIndex), clusterOutputIndex);
+    }
+
+    /**
+     * get the an internal activity and its output channel of a cluster output channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+     */
+    public Pair<ActivityId, Integer> getActivityIdOutputIndex(int clusterOutputIndex) {
+        return clusterOutputIndexMap.get(clusterOutputIndex);
+    }
+
+    /**
+     * Set up the mapping of the cluster's input channel to an internal activity and input output channel
+     * 
+     * @param clusterInputIndex
+     *            the input channel index for the cluster
+     * @param activityId
+     *            the id of the internal activity which consumes the corresponding input
+     * @param activityInputIndex
+     *            the output channel index of the internal activity which corresponds to the input channel of the cluster of activities
+     */
+    public void setClusterInputIndex(int clusterInputIndex, ActivityId activityId, int activityInputIndex) {
+        clusterInputIndexMap.put(clusterInputIndex, Pair.of(activityId, activityInputIndex));
+        invertedClusterInputIndexMap.put(Pair.of(activityId, activityInputIndex), clusterInputIndex);
+    }
+
+    /**
+     * get the an internal activity and its input channel of a cluster input channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+     */
+    public Pair<ActivityId, Integer> getActivityIdInputIndex(int clusterInputIndex) {
+        return clusterInputIndexMap.get(clusterInputIndex);
+    }
+
+    /**
+     * Get the cluster input channel of an input-boundary activity and its input channel
+     * 
+     * @param activityInputChannel
+     *            the input-boundary activity and its input channel
+     * @return the cluster input channel
+     */
+    public int getClusterInputIndex(Pair<ActivityId, Integer> activityInputChannel) {
+        Integer channel = invertedClusterInputIndexMap.get(activityInputChannel);
+        return channel == null ? -1 : channel;
+    }
+
+    /**
+     * Get the cluster output channel of an input-boundary activity and its output channel
+     * 
+     * @param activityOutputChannel
+     *            the output-boundary activity and its output channel
+     * @return the cluster output channel
+     */
+    public int getClusterOutputIndex(Pair<ActivityId, Integer> activityOutputChannel) {
+        Integer channel = invertedClusterOutputIndexMap.get(activityOutputChannel);
+        return channel == null ? -1 : channel;
+    }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
new file mode 100644
index 0000000..734ff85
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.OneToOneConnectedActivityCluster;
+
+/**
+ * This class can be used to execute a DAG of activities inside which
+ * there are only one-to-one connectors.
+ * 
+ * @author yingyib
+ */
+public class SuperActivity extends OneToOneConnectedActivityCluster implements IActivity {
+    private static final long serialVersionUID = 1L;
+    private final ActivityId activityId;
+
+    public SuperActivity(ActivityClusterGraph acg, ActivityClusterId id, ActivityId activityId) {
+        super(acg, id);
+        this.activityId = activityId;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        Map<ActivityId, IActivity> activities = getActivityMap();
+        for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+            /**
+             * extract start activities
+             */
+            List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey());
+            if (conns == null || conns.size() == 0) {
+                startActivities.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * wrap a RecordDescriptorProvider for the super activity
+         */
+        IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() {
+
+            @Override
+            public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+                if (startActivities.get(aid) != null) {
+                    /**
+                     * if the activity is a start (input boundary) activity
+                     */
+                    int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex));
+                    if (superActivityInputChannel >= 0) {
+                        return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
+                    }
+                }
+                if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    /**
+                     * if the activity is an internal activity of the super activity
+                     */
+                    IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex);
+                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                }
+
+                /**
+                 * the following is for the case where the activity is in other SuperActivities
+                 */
+                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+                    ActivityCluster ac = entry.getValue();
+                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+                        SuperActivity sa = (SuperActivity) saEntry.getValue();
+                        if (sa.getActivityMap().get(aid) != null) {
+                            List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
+                            if (conns != null && conns.size() >= inputIndex) {
+                                IConnectorDescriptor conn = conns.get(inputIndex);
+                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                            } else {
+                                int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+                                if (superActivityInputChannel >= 0) {
+                                    return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(),
+                                            superActivityInputChannel);
+                                }
+                            }
+                        }
+                    }
+                }
+                return null;
+            }
+
+            @Override
+            public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+                /**
+                 * if the activity is an output-boundary activity
+                 */
+                int superActivityOutputChannel = SuperActivity.this.getClusterOutputIndex(Pair.of(aid, outputIndex));
+                if (superActivityOutputChannel >= 0) {
+                    return recordDescProvider.getOutputRecordDescriptor(activityId, superActivityOutputChannel);
+                }
+
+                if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    /**
+                     * if the activity is an internal activity of the super activity
+                     */
+                    IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex);
+                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                }
+
+                /**
+                 * the following is for the case where the activity is in other SuperActivities
+                 */
+                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+                    ActivityCluster ac = entry.getValue();
+                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+                        SuperActivity sa = (SuperActivity) saEntry.getValue();
+                        if (sa.getActivityMap().get(aid) != null) {
+                            List<IConnectorDescriptor> conns = sa.getActivityOutputMap().get(aid);
+                            if (conns != null && conns.size() >= outputIndex) {
+                                IConnectorDescriptor conn = conns.get(outputIndex);
+                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                            } else {
+                                superActivityOutputChannel = sa.getClusterOutputIndex(Pair.of(aid, outputIndex));
+                                if (superActivityOutputChannel >= 0) {
+                                    return recordDescProvider.getOutputRecordDescriptor(sa.getActivityId(),
+                                            superActivityOutputChannel);
+                                }
+                            }
+                        }
+                    }
+                }
+                return null;
+            }
+
+        };
+        return new SuperActivityOperatorNodePushable(this, startActivities, ctx, wrappedRecDescProvider, partition,
+                nPartitions);
+    }
+
+    @Override
+    public ActivityId getActivityId() {
+        return activityId;
+    }
+
+    @Override
+    public String toString() {
+        return getActivityMap().values().toString();
+    }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
new file mode 100644
index 0000000..eba5560
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The runtime of a SuperActivity, which internally executes a DAG of one-to-one connected
+ * activities in a single thread.
+ * 
+ * @author yingyib
+ */
+public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
+    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+    private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+    private final Map<ActivityId, IActivity> startActivities;
+    private final SuperActivity parent;
+    private final IHyracksTaskContext ctx;
+    private final IRecordDescriptorProvider recordDescProvider;
+    private final int partition;
+    private final int nPartitions;
+    private int inputArity = 0;
+
+    public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
+            IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        this.parent = parent;
+        this.startActivities = startActivities;
+        this.ctx = ctx;
+        this.recordDescProvider = recordDescProvider;
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+
+        /**
+         * initialize the writer-relationship for the internal DAG of operator node pushables
+         */
+        try {
+            init();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public synchronized void initialize() throws HyracksDataException {
+        /**
+         * initialize operator node pushables in the BFS order
+         */
+        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+            op.initialize();
+        }
+    }
+
+    public void init() throws HyracksDataException {
+        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+        List<IConnectorDescriptor> outputConnectors = null;
+
+        /**
+         * Set up the source operators
+         */
+        for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
+            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
+                    nPartitions);
+            startOperatorNodePushables.put(entry.getKey(), opPushable);
+            operatprNodePushablesBFSOrder.add(opPushable);
+            operatorNodePushables.put(entry.getKey(), opPushable);
+            inputArity += opPushable.getInputArity();
+            outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
+            if (outputConnectors != null) {
+                for (IConnectorDescriptor conn : outputConnectors) {
+                    childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                }
+            }
+        }
+
+        /**
+         * Using BFS (breadth-first search) to construct to runtime execution DAG;
+         */
+        while (childQueue.size() > 0) {
+            /**
+             * expend the executing activities further to the downstream
+             */
+            if (outputConnectors != null && outputConnectors.size() > 0) {
+                for (IConnectorDescriptor conn : outputConnectors) {
+                    if (conn != null) {
+                        childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                    }
+                }
+            }
+
+            /**
+             * construct the source to destination information
+             */
+            Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel = childQueue.poll();
+            ActivityId sourceId = channel.getLeft().getLeft().getActivityId();
+            int outputChannel = channel.getLeft().getRight();
+            ActivityId destId = channel.getRight().getLeft().getActivityId();
+            int inputChannel = channel.getRight().getRight();
+            IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
+            IOperatorNodePushable destOp = operatorNodePushables.get(destId);
+            if (destOp == null) {
+                destOp = channel.getRight().getLeft()
+                        .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+                operatprNodePushablesBFSOrder.add(destOp);
+                operatorNodePushables.put(destId, destOp);
+            }
+
+            /**
+             * construct the dataflow connection from a producer to a consumer
+             */
+            sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+                    recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
+
+            /**
+             * traverse to the child of the current activity
+             */
+            outputConnectors = parent.getActivityOutputMap().get(destId);
+        }
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        /**
+         * de-initialize operator node pushables
+         */
+        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+            op.deinitialize();
+        }
+    }
+
+    @Override
+    public int getInputArity() {
+        return inputArity;
+    }
+
+    @Override
+    public synchronized void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer,
+            RecordDescriptor recordDesc) {
+        /**
+         * set the right output frame writer
+         */
+        Pair<ActivityId, Integer> activityIdOutputIndex = parent.getActivityIdOutputIndex(clusterOutputIndex);
+        IOperatorNodePushable opPushable = operatorNodePushables.get(activityIdOutputIndex.getLeft());
+        opPushable.setOutputFrameWriter(activityIdOutputIndex.getRight(), writer, recordDesc);
+    }
+
+    @Override
+    public synchronized IFrameWriter getInputFrameWriter(final int index) {
+        /**
+         * get the right IFrameWriter from the cluster input index
+         */
+        Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
+        IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft());
+        IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
+        return writer;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return "Super Activity " + parent.getActivityMap().values().toString();
+    }
+
+}
diff --git a/hyracks/hyracks-cli/pom.xml b/hyracks/hyracks-cli/pom.xml
index 7bb6123..a11c28b 100644
--- a/hyracks/hyracks-cli/pom.xml
+++ b/hyracks/hyracks-cli/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index c7eedb3..934dbb2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -17,6 +17,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 096f2e1..08f013d 100644
--- a/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index 9ecc083..7a68467 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -17,6 +17,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-data/hyracks-data-std/pom.xml b/hyracks/hyracks-data/hyracks-data-std/pom.xml
index 3f051f9..d676088 100644
--- a/hyracks/hyracks-data/hyracks-data-std/pom.xml
+++ b/hyracks/hyracks-data/hyracks-data-std/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-dataflow-common/pom.xml b/hyracks/hyracks-dataflow-common/pom.xml
index a0ffb66..34cb096 100644
--- a/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks/hyracks-dataflow-common/pom.xml
@@ -17,6 +17,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFamily.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFamily.java
index 51645c4..ec3c2be 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFamily.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFamily.java
@@ -52,10 +52,10 @@
                     h += fh;
                 }
                 if (h < 0) {
-                    h = -h;
+                    h = -(h+1);
                 }
                 return h % nParts;
             }
         };
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks/hyracks-dataflow-hadoop/pom.xml
index f9d5153..57b59c7 100644
--- a/hyracks/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks/hyracks-dataflow-hadoop/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-hadoop</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hyracks-dataflow-hadoop</name>
 
   <parent>
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index bf27d20..7e1a9a5 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-std</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hyracks-dataflow-std</name>
 
   <parent>
@@ -20,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-documentation/pom.xml b/hyracks/hyracks-documentation/pom.xml
index ed24adb..7aedd57 100644
--- a/hyracks/hyracks-documentation/pom.xml
+++ b/hyracks/hyracks-documentation/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-documentation</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hyracks-documentation</name>
 
   <parent>
diff --git a/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml b/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
index 6350054..792cedd 100644
--- a/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   <artifactId>btreeapp</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>btreeapp</name>
 
   <parent>
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
index ba296ac..ebca323 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -39,6 +39,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml b/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
index 8e90606..ddb5b39 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
@@ -43,6 +43,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
index 70c663d..b1c0d41 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   <artifactId>hadoopcompatapp</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hadoopcompatapp</name>
 
   <parent>
@@ -149,6 +148,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
index dd96db8..054f97d 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   <artifactId>hadoopcompatclient</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hadoopcompatclient</name>
 
   <parent>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
index 1e029be..d4b9abb 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   <artifactId>hadoopcompathelper</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hadoopcompathelper</name>
 
   <parent>
@@ -34,6 +33,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
index a2cf5ae..16787ca 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>hadoop-compat-example</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>hadoop-compat-example</name>
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index e31af75..86813ef 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -18,6 +18,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-examples/pom.xml b/hyracks/hyracks-examples/pom.xml
index 8ce8108..551e2be 100644
--- a/hyracks/hyracks-examples/pom.xml
+++ b/hyracks/hyracks-examples/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-examples</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>hyracks-examples</name>
 
diff --git a/hyracks/hyracks-examples/text-example/pom.xml b/hyracks/hyracks-examples/text-example/pom.xml
index ba8649e..0476bb3 100644
--- a/hyracks/hyracks-examples/text-example/pom.xml
+++ b/hyracks/hyracks-examples/text-example/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>text-example</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>text-example</name>
 
diff --git a/hyracks/hyracks-examples/text-example/textapp/pom.xml b/hyracks/hyracks-examples/text-example/textapp/pom.xml
index 8ac69d8..27dc508 100644
--- a/hyracks/hyracks-examples/text-example/textapp/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textapp/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.text</groupId>
   <artifactId>textapp</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>textapp</name>
 
   <parent>
@@ -144,6 +143,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index 901f1fb2..796c95c 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.text</groupId>
   <artifactId>textclient</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>textclient</name>
 
   <parent>
@@ -35,6 +34,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-examples/text-example/texthelper/pom.xml b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
index 8e32c8c..121c785 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/pom.xml
+++ b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
@@ -38,6 +38,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-examples/tpch-example/pom.xml b/hyracks/hyracks-examples/tpch-example/pom.xml
index b237c9b..9951792 100644
--- a/hyracks/hyracks-examples/tpch-example/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>tpch-example</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>tpch-example</name>
 
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index 6b3f603..1e5e3c0 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -31,6 +31,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml
index 3426293..426a333 100644
--- a/hyracks/hyracks-hadoop-compat/pom.xml
+++ b/hyracks/hyracks-hadoop-compat/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index e74d610..c8634f9 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -20,6 +20,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index 27a1e33..c28b3a2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -19,6 +19,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index fccfec4..1e51a04 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -20,6 +20,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/hyracks/hyracks-ipc/pom.xml b/hyracks/hyracks-ipc/pom.xml
index cb0de08..f12e10c 100644
--- a/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks/hyracks-ipc/pom.xml
@@ -17,6 +17,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
index 50c05da..64a3616 100644
--- a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
+++ b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
@@ -19,6 +19,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-net/pom.xml b/hyracks/hyracks-net/pom.xml
index a079306..d2962c4 100644
--- a/hyracks/hyracks-net/pom.xml
+++ b/hyracks/hyracks-net/pom.xml
@@ -17,6 +17,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-server/pom.xml b/hyracks/hyracks-server/pom.xml
index 6c6640e..66d1c8d 100644
--- a/hyracks/hyracks-server/pom.xml
+++ b/hyracks/hyracks-server/pom.xml
@@ -17,6 +17,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
       <plugin>
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index 118d46d..a1f362e 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-storage-am-common/pom.xml b/hyracks/hyracks-storage-am-common/pom.xml
index 0b32733..93f2e90 100644
--- a/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks/hyracks-storage-am-common/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-storage-am-invertedindex/pom.xml b/hyracks/hyracks-storage-am-invertedindex/pom.xml
index a647e9d..b8a7b7c 100644
--- a/hyracks/hyracks-storage-am-invertedindex/pom.xml
+++ b/hyracks/hyracks-storage-am-invertedindex/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-storage-am-rtree/pom.xml b/hyracks/hyracks-storage-am-rtree/pom.xml
index 61620ec..fd432ff 100644
--- a/hyracks/hyracks-storage-am-rtree/pom.xml
+++ b/hyracks/hyracks-storage-am-rtree/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-storage-common/pom.xml b/hyracks/hyracks-storage-common/pom.xml
index 289171a..834c7d3 100644
--- a/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks/hyracks-storage-common/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-test-support/pom.xml b/hyracks/hyracks-test-support/pom.xml
index 25a5378..0cc8762 100644
--- a/hyracks/hyracks-test-support/pom.xml
+++ b/hyracks/hyracks-test-support/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
index 7b03a71..bf8c478 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
index 2cf6ce2..03490c7 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
@@ -21,6 +21,7 @@
           <source>1.6</source>
           <target>1.6</target>
           <encoding>UTF-8</encoding>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
index ea86042..ba6da6d 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
index bd10e13..d7b6288 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
@@ -20,6 +20,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 0643804..2d1659d 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -23,6 +23,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 0908829..57ae85e 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -67,6 +67,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index 05a1508..63fb33e 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -24,6 +24,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 3f3ba00..9815cd1 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -25,6 +25,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 107f059..4de8290 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -25,6 +25,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index ce8f985..e8e04a1 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -19,6 +19,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 82be6b9..22219e5 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -24,6 +24,7 @@
 				<configuration>
 					<source>1.6</source>
 					<target>1.6</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>