merged kisskys/master_flush_optimization_and_bug_fix to master
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index ab3516a..70306d6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -271,6 +271,7 @@
         // Re-infer all types, so that, e.g., the effect of not-is-null is
         // propagated.
         prepareForJobGenRewrites.add(new ReinferAllTypesRule());
+        prepareForJobGenRewrites.add(new SetExecutionModeRule());
         return prepareForJobGenRewrites;
     }
 
diff --git a/asterix-app/src/test/resources/optimizerts/queries/noncollocated.aql b/asterix-app/src/test/resources/optimizerts/queries/collocated.aql
similarity index 78%
rename from asterix-app/src/test/resources/optimizerts/queries/noncollocated.aql
rename to asterix-app/src/test/resources/optimizerts/queries/collocated.aql
index 26b5ab0..bcd0a0d 100644
--- a/asterix-app/src/test/resources/optimizerts/queries/noncollocated.aql
+++ b/asterix-app/src/test/resources/optimizerts/queries/collocated.aql
@@ -16,15 +16,11 @@
   lottery_numbers: {{int32}}
 }
 
-create nodegroup group1 if not exists on nc1, nc2;
-
-create nodegroup group2 if not exists on nc2;
-
 create dataset Users(UserType) 
-  primary key uid on group1;
+  primary key uid;
 
 create dataset Visitors(VisitorType) 
-  primary key vid on group2;
+  primary key vid;
 
 
 write output to nc1:"/tmp/fuzzy1.adm";
diff --git a/asterix-app/src/test/resources/optimizerts/results/noncollocated.plan b/asterix-app/src/test/resources/optimizerts/results/collocated.plan
similarity index 88%
rename from asterix-app/src/test/resources/optimizerts/results/noncollocated.plan
rename to asterix-app/src/test/resources/optimizerts/results/collocated.plan
index e92a84c..116ff8a 100644
--- a/asterix-app/src/test/resources/optimizerts/results/noncollocated.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/collocated.plan
@@ -5,14 +5,14 @@
         -- STREAM_PROJECT  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- HYBRID_HASH_JOIN [$$10][$$11]  |PARTITIONED|
-              -- HASH_PARTITION_EXCHANGE [$$10]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- DATASOURCE_SCAN  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-              -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan b/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
index 0c7b95d..8ae274a 100644
--- a/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
@@ -73,8 +73,8 @@
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                               -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                       -- HASH_PARTITION_EXCHANGE [$$60]  |PARTITIONED|
-                                        -- STREAM_PROJECT  |UNPARTITIONED|
-                                          -- ASSIGN  |UNPARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                               -- SPLIT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/queries/types/record01/record01.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/types/record01/record01.1.ddl.aql
new file mode 100644
index 0000000..faae040
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/types/record01/record01.1.ddl.aql
@@ -0,0 +1,5 @@
+drop dataverse local if exists;
+create dataverse local;
+use dataverse local;
+create type ttype as { "id" : int32 } ;
+create dataset dset (ttype) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/types/record01/record01.1.adm b/asterix-app/src/test/resources/runtimets/results/types/record01/record01.1.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/types/record01/record01.1.adm
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index eadf7fb..1e65071 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4442,4 +4442,11 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="types">
+    <test-case FilePath="types">
+      <compilation-unit name="record01">
+        <output-dir compare="Text">record01</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 84d46f5..fdcd86e 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -96,6 +96,49 @@
         }  
         return s.substring(1).trim();
     }
+    
+    private static IRecordFieldDataGen parseFieldDataGen(String hint) throws ParseException {
+      IRecordFieldDataGen rfdg = null;
+      String splits[] = hint.split(" +");
+      if (splits[0].equals(VAL_FILE_HINT)) { 
+        File[] valFiles = new File[splits.length - 1];
+        for (int k=1; k<splits.length; k++) {
+          valFiles[k-1] = new File(splits[k]);
+        } 
+        rfdg = new FieldValFileDataGen(valFiles);
+      } else if (splits[0].equals(VAL_FILE_SAME_INDEX_HINT)) {
+        rfdg = new FieldValFileSameIndexDataGen(new File(splits[1]), splits[2]);
+      } else if (splits[0].equals(LIST_VAL_FILE_HINT)) {
+        rfdg = new ListValFileDataGen(new File(splits[1]), Integer.parseInt(splits[2]), Integer.parseInt(splits[3]));
+      } else if (splits[0].equals(LIST_HINT)) {
+        rfdg = new ListDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]));
+      } else if (splits[0].equals(INTERVAL_HINT)) {
+        FieldIntervalDataGen.ValueType vt;
+        if (splits[1].equals("int")) {
+          vt = FieldIntervalDataGen.ValueType.INT;
+        } else if (splits[1].equals("long")) {
+          vt = FieldIntervalDataGen.ValueType.LONG;
+        } else if (splits[1].equals("float")) {
+          vt = FieldIntervalDataGen.ValueType.FLOAT;
+        } else if (splits[1].equals("double")) {
+          vt = FieldIntervalDataGen.ValueType.DOUBLE;
+        } else {
+          throw new ParseException("Unknown type for interval data gen: " + splits[1]);
+        }
+        rfdg = new FieldIntervalDataGen(vt, splits[2], splits[3]); 
+      } else if (splits[0].equals(INSERT_RAND_INT_HINT)) {
+        rfdg = new InsertRandIntDataGen(splits[1], splits[2]);
+      } else if (splits[0].equals(DATE_BETWEEN_YEARS_HINT)) {
+        rfdg = new DateBetweenYearsDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]));
+      } else if (splits[0].equals(DATETIME_BETWEEN_YEARS_HINT)) {
+        rfdg = new DatetimeBetweenYearsDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]));
+      } else if (splits[0].equals(DATETIME_ADD_RAND_HOURS_HINT)) {
+        rfdg = new DatetimeAddRandHoursDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]), splits[3]);
+      } else if (splits[0].equals(AUTO_HINT)) {
+        rfdg = new AutoDataGen(splits[1]);
+      }
+      return rfdg;
+    }
 
     public AQLParser(String s){
         this(new StringReader(s));
@@ -893,63 +936,20 @@
 
 void RecordField(RecordTypeDefinition recType) throws ParseException:
 {
-        String fieldName;
-        TypeExpression type = null; 
-        boolean nullable = false;
+  String fieldName;
+  TypeExpression type = null; 
+  boolean nullable = false;
 }
 {
-      fieldName = Identifier()
-      	{
-	     fieldName = token.image;	     	     
-         String hint = getHint(token);
-         IRecordFieldDataGen rfdg = null;
-         if (hint != null) { 
-           String splits[] = hint.split(" +");
-           if (splits[0].equals(VAL_FILE_HINT)) { 
-             File[] valFiles = new File[splits.length - 1];
-             for (int k=1; k<splits.length; k++) {
-               valFiles[k-1] = new File(splits[k]);
-             } 
-             rfdg = new FieldValFileDataGen(valFiles);
-           } else if (splits[0].equals(VAL_FILE_SAME_INDEX_HINT)) {
-             rfdg = new FieldValFileSameIndexDataGen(new File(splits[1]), splits[2]);
-           } else if (splits[0].equals(LIST_VAL_FILE_HINT)) {
-             rfdg = new ListValFileDataGen(new File(splits[1]), Integer.parseInt(splits[2]), Integer.parseInt(splits[3]));
-           } else if (splits[0].equals(LIST_HINT)) {
-             rfdg = new ListDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]));
-           } else if (splits[0].equals(INTERVAL_HINT)) {
-             FieldIntervalDataGen.ValueType vt;
-             if (splits[1].equals("int")) {
-               vt = FieldIntervalDataGen.ValueType.INT;
-             } else if (splits[1].equals("long")) {
-               vt = FieldIntervalDataGen.ValueType.LONG;
-             } else if (splits[1].equals("float")) {
-               vt = FieldIntervalDataGen.ValueType.FLOAT;
-             } else if (splits[1].equals("double")) {
-               vt = FieldIntervalDataGen.ValueType.DOUBLE;
-             } else {
-               throw new ParseException("Unknown type for interval data gen: " + splits[1]);
-             }
-             rfdg = new FieldIntervalDataGen(vt, splits[2], splits[3]); 
-           } else if (splits[0].equals(INSERT_RAND_INT_HINT)) {
-             rfdg = new InsertRandIntDataGen(splits[1], splits[2]);
-           } else if (splits[0].equals(DATE_BETWEEN_YEARS_HINT)) {
-             rfdg = new DateBetweenYearsDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]));
-           } else if (splits[0].equals(DATETIME_BETWEEN_YEARS_HINT)) {
-             rfdg = new DatetimeBetweenYearsDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]));
-           } else if (splits[0].equals(DATETIME_ADD_RAND_HOURS_HINT)) {
-             rfdg = new DatetimeAddRandHoursDataGen(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]), splits[3]);
-           } else if (splits[0].equals(AUTO_HINT)) {
-              rfdg = new AutoDataGen(splits[1]);
-           }
-         }
-	    }
-	  ":"
-	  ( type =  TypeExpr() )
-	  ("?" { nullable = true; } )?
-	  {
-	     recType.addField(fieldName, type, nullable, rfdg);
-	  }   
+  fieldName = Identifier()
+    {
+      String hint = getHint(token);
+      IRecordFieldDataGen rfdg = hint != null ? parseFieldDataGen(hint) : null;
+    }
+  ":" type =  TypeExpr() ("?" { nullable = true; } )?
+    {
+      recType.addField(fieldName, type, nullable, rfdg);
+    }   
 }
 
 TypeReferenceExpression TypeReference() throws ParseException:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index 0ed3c78..166de00 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -32,7 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.AsterixNodeGroupDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
@@ -120,7 +120,7 @@
             schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
         }
         schemaTypes[n] = itemType;
-        domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
+        domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
     }
 
     private void initFeedDataset(IAType itemType, Dataset dataset) throws IOException {
@@ -135,7 +135,7 @@
                 schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
             }
             schemaTypes[n] = itemType;
-            domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
+            domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
         }
     }