diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index 4fef771..332b48b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -16,6 +16,7 @@
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
@@ -36,8 +37,7 @@
     private final String adapter;
     private final Map<String, String> adapterConfiguration;
     private final IAType atype;
-
-    private transient IDatasourceReadAdapter datasourceReadAdapter;
+    private  IDatasourceReadAdapter datasourceReadAdapter;
 
     public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
             IAType atype, RecordDescriptor rDesc) {
@@ -86,7 +86,7 @@
             throws HyracksDataException {
 
         try {
-            datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
+            //datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
             datasourceReadAdapter.configure(adapterConfiguration, atype);
             datasourceReadAdapter.initialize(ctx);
         } catch (Exception e) {
@@ -106,4 +106,8 @@
             }
         };
     }
+    
+    public void setDatasourceAdapter(IDatasourceReadAdapter adapterInstance){
+        this.datasourceReadAdapter = adapterInstance;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 320a29f..dd153ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -40,13 +40,13 @@
 
     protected Map<String, String> configuration;
 
-    protected AlgebricksPartitionConstraint partitionConstraint;
+    protected transient AlgebricksPartitionConstraint partitionConstraint;
 
     protected IAType atype;
 
     protected IHyracksTaskContext ctx;
 
-    protected IDataParser dataParser;
+    protected transient IDataParser dataParser;
 
     protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index b59f034..8bd2fb0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -26,15 +26,14 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.Counters.Counter;
 
 import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-import edu.uci.ics.asterix.external.data.parser.DelimitedDataStreamParser;
 import edu.uci.ics.asterix.external.data.parser.IDataParser;
 import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -56,12 +55,13 @@
     private List<String> hdfsPaths;
     private String inputFormatClassName;
     private Object[] inputSplits;
-    private JobConf conf;
+    private transient JobConf conf;
     private IHyracksTaskContext ctx;
     private Reporter reporter;
     private boolean isDelimited;
     private Character delimiter;
     private InputSplitsProxy inputSplitsProxy;
+    private String parserClass;
     private static final Map<String, String> formatClassNames = new HashMap<String, String>();
 
     public static final String KEY_HDFS_URL = "hdfs";
@@ -108,7 +108,7 @@
             throw new Exception("format " + format + " not supported");
         }
 
-        String parserClass = configuration.get(KEY_PARSER);
+        parserClass = configuration.get(KEY_PARSER);
         if (parserClass == null) {
             if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
                 parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
@@ -116,18 +116,23 @@
                 parserClass = formatToParserMap.get(FORMAT_ADM);
             }
         }
+       
+    }
 
+    private void configureParser() throws Exception {
         dataParser = (IDataParser) Class.forName(parserClass).newInstance();
         dataParser.configure(configuration);
     }
 
     private void configurePartitionConstraint() throws Exception {
-        InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
-        inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
-        partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
-        hdfsPaths = new ArrayList<String>();
-        for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
-            hdfsPaths.add(hdfsPath);
+        if (inputSplitsProxy == null) {
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
+            inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
+            partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
+            hdfsPaths = new ArrayList<String>();
+            for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
+                hdfsPaths.add(hdfsPath);
+            }
         }
     }
 
@@ -172,6 +177,7 @@
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
         inputSplits = inputSplitsProxy.toInputSplits(conf);
+        configureParser();
         dataParser.initialize((ARecordType) atype, ctx);
         reporter = new Reporter() {
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 2449db0..385ffe5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -34,7 +34,8 @@
 public class NCFileSystemAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
 
     private static final long serialVersionUID = -4154256369973615710L;
-    protected FileSplit[] fileSplits;
+    private FileSplit[] fileSplits;
+    private String parserClass;
 
     public class Constants {
         public static final String KEY_SPLITS = "path";
@@ -69,6 +70,7 @@
     @Override
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
+        configureDataParser();
         dataParser.initialize((ARecordType) atype, ctx);
     }
 
@@ -101,20 +103,22 @@
     }
 
     private void configureFileSplits(String[] splits) {
-        fileSplits = new FileSplit[splits.length];
-        String nodeName;
-        String nodeLocalPath;
-        int count = 0;
-        for (String splitPath : splits) {
-            nodeName = splitPath.split(":")[0];
-            nodeLocalPath = splitPath.split("://")[1];
-            FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
-            fileSplits[count++] = fileSplit;
+        if (fileSplits == null) {
+            fileSplits = new FileSplit[splits.length];
+            String nodeName;
+            String nodeLocalPath;
+            int count = 0;
+            for (String splitPath : splits) {
+                nodeName = splitPath.split(":")[0];
+                nodeLocalPath = splitPath.split("://")[1];
+                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+                fileSplits[count++] = fileSplit;
+            }
         }
     }
 
     protected void configureFormat() throws Exception {
-        String parserClass = configuration.get(Constants.KEY_PARSER);
+        parserClass = configuration.get(Constants.KEY_PARSER);
         if (parserClass == null) {
             if (Constants.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
                 parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
@@ -124,6 +128,10 @@
                 throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
             }
         }
+
+    }
+
+    private void configureDataParser() throws Exception {
         dataParser = (IDataParser) Class.forName(parserClass).newInstance();
         dataParser.configure(configuration);
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 2d85fe3..17deaaf 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.asterix.dataflow.base.IAsterixApplicationContextInfo;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
 import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
 import edu.uci.ics.asterix.feed.comm.IFeedMessage;
 import edu.uci.ics.asterix.feed.mgmt.FeedId;
@@ -87,846 +88,1025 @@
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
 
-public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
-    private final long txnId;
-    private boolean isWriteTransaction;
-    private final AqlCompiledMetadataDeclarations metadata;
+public class AqlMetadataProvider implements
+		IMetadataProvider<AqlSourceId, String> {
+	private final long txnId;
+	private boolean isWriteTransaction;
+	private final AqlCompiledMetadataDeclarations metadata;
 
-    public AqlMetadataProvider(long txnId, boolean isWriteTransaction, AqlCompiledMetadataDeclarations metadata) {
-        this.txnId = txnId;
-        this.isWriteTransaction = isWriteTransaction;
-        this.metadata = metadata;
-    }
+	public AqlMetadataProvider(long txnId, boolean isWriteTransaction,
+			AqlCompiledMetadataDeclarations metadata) {
+		this.txnId = txnId;
+		this.isWriteTransaction = isWriteTransaction;
+		this.metadata = metadata;
+	}
 
-    @Override
-    public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
-        AqlSourceId aqlId = (AqlSourceId) id;
-        return lookupSourceInMetadata(metadata, aqlId);
-    }
+	@Override
+	public AqlDataSource findDataSource(AqlSourceId id)
+			throws AlgebricksException {
+		AqlSourceId aqlId = (AqlSourceId) id;
+		return lookupSourceInMetadata(metadata, aqlId);
+	}
 
-    public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
-        return metadata;
-    }
+	public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
+		return metadata;
+	}
 
-    public boolean isWriteTransaction() {
-        return isWriteTransaction;
-    }
+	public boolean isWriteTransaction() {
+		return isWriteTransaction;
+	}
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
-            IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
-            List<LogicalVariable> projectVariables, boolean projectPushed, JobGenContext context,
-            JobSpecification jobSpec) throws AlgebricksException {
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId().getDatasetName());
-        if (adecl == null) {
-            throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName());
-        }
-        switch (adecl.getDatasetType()) {
-            case FEED:
-                if (dataSource instanceof ExternalFeedDataSource) {
-                    return buildExternalDatasetScan(jobSpec, adecl, dataSource);
-                } else {
-                    return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
-                }
+	@Override
+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
+			IDataSource<AqlSourceId> dataSource,
+			List<LogicalVariable> scanVariables,
+			List<LogicalVariable> projectVariables, boolean projectPushed,
+			JobGenContext context, JobSpecification jobSpec)
+			throws AlgebricksException {
+		AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId()
+				.getDatasetName());
+		if (adecl == null) {
+			throw new AlgebricksException("Unknown dataset "
+					+ dataSource.getId().getDatasetName());
+		}
+		switch (adecl.getDatasetType()) {
+		case FEED:
+			if (dataSource instanceof ExternalFeedDataSource) {
+				return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+			} else {
+				return buildInternalDatasetScan(jobSpec, adecl, dataSource,
+						context);
+			}
 
-            case INTERNAL: {
-                return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
-            }
+		case INTERNAL: {
+			return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
+		}
 
-            case EXTERNAL: {
-                return buildExternalDatasetScan(jobSpec, adecl, dataSource);
-            }
+		case EXTERNAL: {
+			return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+		}
 
-            default: {
-                throw new IllegalArgumentException();
-            }
-        }
-    }
+		default: {
+			throw new IllegalArgumentException();
+		}
+		}
+	}
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
-            AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource, JobGenContext context)
-            throws AlgebricksException {
-        AqlSourceId asid = dataSource.getId();
-        String datasetName = asid.getDatasetName();
-        String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
+	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(
+			JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
+			IDataSource<AqlSourceId> dataSource, JobGenContext context)
+			throws AlgebricksException {
+		AqlSourceId asid = dataSource.getId();
+		String datasetName = asid.getDatasetName();
+		String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
 
-        try {
-            return buildBtreeRuntime(metadata, context, jobSpec, datasetName, acedl, indexName, null, null, true, true);
-        } catch (AlgebricksException e) {
-            throw new AlgebricksException(e);
-        }
-    }
+		try {
+			return buildBtreeRuntime(metadata, context, jobSpec, datasetName,
+					acedl, indexName, null, null, true, true);
+		} catch (AlgebricksException e) {
+			throw new AlgebricksException(e);
+		}
+	}
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
-            AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
-        String itemTypeName = acedl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(
+			JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
+			IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+		String itemTypeName = acedl.getItemTypeName();
+		IAType itemType;
+		try {
+			itemType = metadata.findType(itemTypeName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        if (dataSource instanceof ExternalFeedDataSource) {
-            AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
-                    .getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
+		if (dataSource instanceof ExternalFeedDataSource) {
+			AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
+					.getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
 
-            return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(), acedl.getName(), itemType, acfdd,
-                    metadata.getFormat());
-        } else {
-            return buildExternalDataScannerRuntime(jobSpec, itemType,
-                    (AqlCompiledExternalDatasetDetails) acedl.getAqlCompiledDatasetDetails(), metadata.getFormat());
-        }
-    }
+			return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(),
+					acedl.getName(), itemType, acfdd, metadata.getFormat());
+		} else {
+			return buildExternalDataScannerRuntime(jobSpec, itemType,
+					(AqlCompiledExternalDatasetDetails) acedl
+							.getAqlCompiledDatasetDetails(), metadata
+							.getFormat());
+		}
+	}
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, AqlCompiledExternalDatasetDetails decl, IDataFormat format)
-            throws AlgebricksException {
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only scan datasets of records.");
-        }
+	@SuppressWarnings("rawtypes")
+	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
+			JobSpecification jobSpec, IAType itemType,
+			AqlCompiledExternalDatasetDetails decl, IDataFormat format)
+			throws AlgebricksException {
+		if (itemType.getTypeTag() != ATypeTag.RECORD) {
+			throw new AlgebricksException("Can only scan datasets of records.");
+		}
 
-        IDatasourceAdapter adapter;
-        try {
-            adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("unable to load the adapter class " + e);
-        }
+		IDatasourceReadAdapter adapter;
+		try {
+			adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter())
+					.newInstance();
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new AlgebricksException("unable to load the adapter class "
+					+ e);
+		}
 
-        if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
-                IDatasourceAdapter.AdapterType.READ_WRITE))) {
-            throw new AlgebricksException("external dataset does not support read");
-        }
-        ARecordType rt = (ARecordType) itemType;
-        try {
-            adapter.configure(decl.getProperties(), itemType);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("unable to configure the datasource adapter " + e);
-        }
+		if (!(adapter.getAdapterType().equals(
+				IDatasourceAdapter.AdapterType.READ) || adapter
+				.getAdapterType().equals(
+						IDatasourceAdapter.AdapterType.READ_WRITE))) {
+			throw new AlgebricksException(
+					"external dataset does not support read");
+		}
+		ARecordType rt = (ARecordType) itemType;
+		try {
+			adapter.configure(decl.getProperties(), itemType);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new AlgebricksException(
+					"unable to configure the datasource adapter " + e);
+		}
 
-        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+		ISerializerDeserializer payloadSerde = format.getSerdeProvider()
+				.getSerializerDeserializer(itemType);
+		RecordDescriptor scannerDesc = new RecordDescriptor(
+				new ISerializerDeserializer[] { payloadSerde });
 
-        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-                decl.getAdapter(), decl.getProperties(), rt, scannerDesc);
-        AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
-    }
+		ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(
+				jobSpec, decl.getAdapter(), decl.getProperties(), rt,
+				scannerDesc);
+		dataScanner.setDatasourceAdapter(adapter);
+		AlgebricksPartitionConstraint constraint = adapter
+				.getPartitionConstraint();
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				dataScanner, constraint);
+	}
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, IParseFileSplitsDecl decl, IDataFormat format)
-            throws AlgebricksException {
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only scan datasets of records.");
-        }
-        ARecordType rt = (ARecordType) itemType;
-        ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
-        FileSplit[] splits = decl.getSplits();
-        IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
-        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-        IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
-                scannerDesc);
-        String[] locs = new String[splits.length];
-        for (int i = 0; i < splits.length; i++) {
-            locs[i] = splits[i].getNodeName();
-        }
-        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
-    }
+	@SuppressWarnings("rawtypes")
+	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
+			JobSpecification jobSpec, IAType itemType,
+			IParseFileSplitsDecl decl, IDataFormat format)
+			throws AlgebricksException {
+		if (itemType.getTypeTag() != ATypeTag.RECORD) {
+			throw new AlgebricksException("Can only scan datasets of records.");
+		}
+		ARecordType rt = (ARecordType) itemType;
+		ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
+		FileSplit[] splits = decl.getSplits();
+		IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(
+				splits);
+		ISerializerDeserializer payloadSerde = format.getSerdeProvider()
+				.getSerializerDeserializer(itemType);
+		RecordDescriptor scannerDesc = new RecordDescriptor(
+				new ISerializerDeserializer[] { payloadSerde });
+		IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec,
+				scannerSplitProvider, tupleParser, scannerDesc);
+		String[] locs = new String[splits.length];
+		for (int i = 0; i < splits.length; i++) {
+			locs[i] = splits[i].getNodeName();
+		}
+		AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
+				locs);
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				scanner, apc);
+	}
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
-            JobSpecification jobSpec, String dataverse, String dataset, IAType itemType,
-            AqlCompiledFeedDatasetDetails decl, IDataFormat format) throws AlgebricksException {
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only consume records.");
-        }
-        IDatasourceAdapter adapter;
-        try {
-            adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("unable to load the adapter class " + e);
-        }
+	@SuppressWarnings("rawtypes")
+	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
+			JobSpecification jobSpec, String dataverse, String dataset,
+			IAType itemType, AqlCompiledFeedDatasetDetails decl,
+			IDataFormat format) throws AlgebricksException {
+		if (itemType.getTypeTag() != ATypeTag.RECORD) {
+			throw new AlgebricksException("Can only consume records.");
+		}
+		IDatasourceAdapter adapter;
+		try {
+			adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter())
+					.newInstance();
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new AlgebricksException("unable to load the adapter class "
+					+ e);
+		}
 
-        ARecordType rt = (ARecordType) itemType;
-        try {
-            adapter.configure(decl.getProperties(), itemType);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("unable to configure the datasource adapter " + e);
-        }
+		ARecordType rt = (ARecordType) itemType;
+		try {
+			adapter.configure(decl.getProperties(), itemType);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new AlgebricksException(
+					"unable to configure the datasource adapter " + e);
+		}
 
-        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-        RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+		ISerializerDeserializer payloadSerde = format.getSerdeProvider()
+				.getSerializerDeserializer(itemType);
+		RecordDescriptor feedDesc = new RecordDescriptor(
+				new ISerializerDeserializer[] { payloadSerde });
 
-        FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(dataverse,
-                dataset), decl.getAdapter(), decl.getProperties(), rt, feedDesc);
+		FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(
+				jobSpec, new FeedId(dataverse, dataset), decl.getAdapter(),
+				decl.getProperties(), rt, feedDesc);
 
-        AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
-    }
+		AlgebricksPartitionConstraint constraint = adapter
+				.getPartitionConstraint();
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				feedIngestor, constraint);
+	}
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
-            JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata, AqlCompiledFeedDatasetDetails decl,
-            String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
+	@SuppressWarnings("rawtypes")
+	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
+			JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata,
+			AqlCompiledFeedDatasetDetails decl, String dataverse,
+			String dataset, List<IFeedMessage> feedMessages)
+			throws AlgebricksException {
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-        try {
-            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset, dataset);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+		try {
+			spPc = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							dataset, dataset);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
-                feedMessages);
+		FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(
+				jobSpec, dataverse, dataset, feedMessages);
 
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
-    }
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				feedMessenger, spPc.second);
+	}
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
-            AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
-            String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
-            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
-        String itemTypeName = ddecl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+	@SuppressWarnings("rawtypes")
+	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
+			AqlCompiledMetadataDeclarations metadata, JobGenContext context,
+			JobSpecification jobSpec, String datasetName,
+			AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
+			int[] highKeyFields, boolean lowKeyInclusive,
+			boolean highKeyInclusive) throws AlgebricksException {
+		String itemTypeName = ddecl.getItemTypeName();
+		IAType itemType;
+		try {
+			itemType = metadata.findType(itemTypeName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        boolean isSecondary = true;
-        AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+		boolean isSecondary = true;
+		AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
 
-        if (primIdxDecl != null) {
-            isSecondary = !indexName.equals(primIdxDecl.getIndexName());
-        }
+		if (primIdxDecl != null) {
+			isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+		}
 
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
-        ISerializerDeserializer[] recordFields;
-        IBinaryComparatorFactory[] comparatorFactories;
-        ITypeTraits[] typeTraits;
-        int numSecondaryKeys = 0;
-        int i = 0;
-        if (isSecondary) {
-            AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
-            if (cid == null) {
-                throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
-                        + datasetName);
-            }
-            List<String> secondaryKeyFields = cid.getFieldExprs();
-            numSecondaryKeys = secondaryKeyFields.size();
-            int numKeys = numSecondaryKeys + numPrimaryKeys;
-            recordFields = new ISerializerDeserializer[numKeys];
-            typeTraits = new ITypeTraits[numKeys];
-            // comparatorFactories = new
-            // IBinaryComparatorFactory[numSecondaryKeys];
-            comparatorFactories = new IBinaryComparatorFactory[numKeys];
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-            ARecordType recType = (ARecordType) itemType;
-            for (i = 0; i < numSecondaryKeys; i++) {
-                IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), recType);
-                ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
-                        .getSerializerDeserializer(keyType);
-                recordFields[i] = keySerde;
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                        keyType, OrderKind.ASC);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-        } else {
-            recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-            comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-            typeTraits = new ITypeTraits[numPrimaryKeys + 1];
-            ISerializerDeserializer payloadSerde = metadata.getFormat().getSerdeProvider()
-                    .getSerializerDeserializer(itemType);
-            recordFields[numPrimaryKeys] = payloadSerde;
-            typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-        }
+		int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
+				.size();
+		ISerializerDeserializer[] recordFields;
+		IBinaryComparatorFactory[] comparatorFactories;
+		ITypeTraits[] typeTraits;
+		int numSecondaryKeys = 0;
+		int i = 0;
+		if (isSecondary) {
+			AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+					ddecl, indexName);
+			if (cid == null) {
+				throw new AlgebricksException(
+						"Code generation error: no index " + indexName
+								+ " for dataset " + datasetName);
+			}
+			List<String> secondaryKeyFields = cid.getFieldExprs();
+			numSecondaryKeys = secondaryKeyFields.size();
+			int numKeys = numSecondaryKeys + numPrimaryKeys;
+			recordFields = new ISerializerDeserializer[numKeys];
+			typeTraits = new ITypeTraits[numKeys];
+			// comparatorFactories = new
+			// IBinaryComparatorFactory[numSecondaryKeys];
+			comparatorFactories = new IBinaryComparatorFactory[numKeys];
+			if (itemType.getTypeTag() != ATypeTag.RECORD) {
+				throw new AlgebricksException(
+						"Only record types can be indexed.");
+			}
+			ARecordType recType = (ARecordType) itemType;
+			for (i = 0; i < numSecondaryKeys; i++) {
+				IAType keyType = AqlCompiledIndexDecl.keyFieldType(
+						secondaryKeyFields.get(i), recType);
+				ISerializerDeserializer keySerde = metadata.getFormat()
+						.getSerdeProvider().getSerializerDeserializer(keyType);
+				recordFields[i] = keySerde;
+				comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+						.getBinaryComparatorFactory(keyType, OrderKind.ASC);
+				typeTraits[i] = AqlTypeTraitProvider.INSTANCE
+						.getTypeTrait(keyType);
+			}
+		} else {
+			recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+			comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+			typeTraits = new ITypeTraits[numPrimaryKeys + 1];
+			ISerializerDeserializer payloadSerde = metadata.getFormat()
+					.getSerdeProvider().getSerializerDeserializer(itemType);
+			recordFields[numPrimaryKeys] = payloadSerde;
+			typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+					.getTypeTrait(itemType);
+		}
 
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(ddecl)) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
-                    .getSerializerDeserializer(keyType);
-            recordFields[i] = keySerde;
-            // if (!isSecondary) {
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    OrderKind.ASC);
-            // }
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
+		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+				.getPartitioningFunctions(ddecl)) {
+			IAType keyType = evalFactoryAndType.third;
+			ISerializerDeserializer keySerde = metadata.getFormat()
+					.getSerdeProvider().getSerializerDeserializer(keyType);
+			recordFields[i] = keySerde;
+			// if (!isSecondary) {
+			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+					.getBinaryComparatorFactory(keyType, OrderKind.ASC);
+			// }
+			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+			++i;
+		}
 
-        ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
-        ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+		ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+		ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
 
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+				.getAppContext();
+		RecordDescriptor recDesc = new RecordDescriptor(recordFields);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-        try {
-            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+		try {
+			spPc = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							datasetName, indexName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, recDesc,
-                appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(), spPc.first,
-                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories, true, lowKeyFields,
-                highKeyFields, lowKeyInclusive, highKeyInclusive, new BTreeDataflowHelperFactory());
+		BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(
+				jobSpec, recDesc, appContext.getStorageManagerInterface(),
+				appContext.getTreeRegisterProvider(), spPc.first,
+				interiorFrameFactory, leafFrameFactory, typeTraits,
+				comparatorFactories, true, lowKeyFields, highKeyFields,
+				lowKeyInclusive, highKeyInclusive,
+				new BTreeDataflowHelperFactory());
 
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
-    }
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				btreeSearchOp, spPc.second);
+	}
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
-            AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
-            String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
-            throws AlgebricksException {
-        String itemTypeName = ddecl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+	@SuppressWarnings("rawtypes")
+	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
+			AqlCompiledMetadataDeclarations metadata, JobGenContext context,
+			JobSpecification jobSpec, String datasetName,
+			AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
+			throws AlgebricksException {
+		String itemTypeName = ddecl.getItemTypeName();
+		IAType itemType;
+		try {
+			itemType = metadata.findType(itemTypeName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        boolean isSecondary = true;
-        AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
-        if (primIdxDecl != null) {
-            isSecondary = !indexName.equals(primIdxDecl.getIndexName());
-        }
+		boolean isSecondary = true;
+		AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+		if (primIdxDecl != null) {
+			isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+		}
 
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
-        ISerializerDeserializer[] recordFields;
-        IBinaryComparatorFactory[] comparatorFactories;
-        ITypeTraits[] typeTraits;
-        IPrimitiveValueProviderFactory[] valueProviderFactories;
-        int numSecondaryKeys = 0;
-        int numNestedSecondaryKeyFields = 0;
-        int i = 0;
-        if (isSecondary) {
-            AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
-            if (cid == null) {
-                throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
-                        + datasetName);
-            }
-            List<String> secondaryKeyFields = cid.getFieldExprs();
-            numSecondaryKeys = secondaryKeyFields.size();
+		int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
+				.size();
+		ISerializerDeserializer[] recordFields;
+		IBinaryComparatorFactory[] comparatorFactories;
+		ITypeTraits[] typeTraits;
+		IPrimitiveValueProviderFactory[] valueProviderFactories;
+		int numSecondaryKeys = 0;
+		int numNestedSecondaryKeyFields = 0;
+		int i = 0;
+		if (isSecondary) {
+			AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+					ddecl, indexName);
+			if (cid == null) {
+				throw new AlgebricksException(
+						"Code generation error: no index " + indexName
+								+ " for dataset " + datasetName);
+			}
+			List<String> secondaryKeyFields = cid.getFieldExprs();
+			numSecondaryKeys = secondaryKeyFields.size();
 
-            if (numSecondaryKeys != 1) {
-                throw new AlgebricksException(
-                        "Cannot use "
-                                + numSecondaryKeys
-                                + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
-            }
+			if (numSecondaryKeys != 1) {
+				throw new AlgebricksException(
+						"Cannot use "
+								+ numSecondaryKeys
+								+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+			}
 
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-            ARecordType recType = (ARecordType) itemType;
+			if (itemType.getTypeTag() != ATypeTag.RECORD) {
+				throw new AlgebricksException(
+						"Only record types can be indexed.");
+			}
+			ARecordType recType = (ARecordType) itemType;
 
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), recType);
-            if (keyType == null) {
-                throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
-            }
+			IAType keyType = AqlCompiledIndexDecl.keyFieldType(
+					secondaryKeyFields.get(0), recType);
+			if (keyType == null) {
+				throw new AlgebricksException("Could not find field "
+						+ secondaryKeyFields.get(0) + " in the schema.");
+			}
 
-            int dimension = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
-            numNestedSecondaryKeyFields = dimension * 2;
+			int dimension = NonTaggedFormatUtil.getNumDimensions(keyType
+					.getTypeTag());
+			numNestedSecondaryKeyFields = dimension * 2;
 
-            int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
-            recordFields = new ISerializerDeserializer[numFields];
-            typeTraits = new ITypeTraits[numFields];
-            comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-            valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+			int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
+			recordFields = new ISerializerDeserializer[numFields];
+			typeTraits = new ITypeTraits[numFields];
+			comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+			valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
 
-            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-            for (i = 0; i < numNestedSecondaryKeyFields; i++) {
-                ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(nestedKeyType);
-                recordFields[i] = keySerde;
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                        nestedKeyType, OrderKind.ASC);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-            }
-        } else {
-            throw new AlgebricksException("R-tree can only be used as a secondary index");
-        }
+			IAType nestedKeyType = NonTaggedFormatUtil
+					.getNestedSpatialType(keyType.getTypeTag());
+			for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+				ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+						.getSerializerDeserializer(nestedKeyType);
+				recordFields[i] = keySerde;
+				comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+						.getBinaryComparatorFactory(nestedKeyType,
+								OrderKind.ASC);
+				typeTraits[i] = AqlTypeTraitProvider.INSTANCE
+						.getTypeTrait(nestedKeyType);
+				valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+			}
+		} else {
+			throw new AlgebricksException(
+					"R-tree can only be used as a secondary index");
+		}
 
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(ddecl)) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(keyType);
-            recordFields[i] = keySerde;
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
+		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+				.getPartitioningFunctions(ddecl)) {
+			IAType keyType = evalFactoryAndType.third;
+			ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+					.getSerializerDeserializer(keyType);
+			recordFields[i] = keySerde;
+			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+			++i;
+		}
 
-        ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
-                new RTreeTypeAwareTupleWriterFactory(typeTraits), valueProviderFactories);
-        ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(new RTreeTypeAwareTupleWriterFactory(
-                typeTraits), valueProviderFactories);
-        /*
-                ITreeIndexFrameFactory interiorFrameFactory = JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
-                        numNestedSecondaryKeyFields);
-                ITreeIndexFrameFactory leafFrameFactory = JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
-                        numNestedSecondaryKeyFields);
-        */
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+		ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
+				new RTreeTypeAwareTupleWriterFactory(typeTraits),
+				valueProviderFactories);
+		ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(
+				new RTreeTypeAwareTupleWriterFactory(typeTraits),
+				valueProviderFactories);
+		/*
+		 * ITreeIndexFrameFactory interiorFrameFactory =
+		 * JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
+		 * numNestedSecondaryKeyFields); ITreeIndexFrameFactory leafFrameFactory
+		 * = JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
+		 * numNestedSecondaryKeyFields);
+		 */
+		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+				.getAppContext();
+		RecordDescriptor recDesc = new RecordDescriptor(recordFields);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-        try {
-            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+		try {
+			spPc = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							datasetName, indexName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
-                appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(), spPc.first,
-                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories, keyFields,
-                new RTreeDataflowHelperFactory());
+		RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(
+				jobSpec, recDesc, appContext.getStorageManagerInterface(),
+				appContext.getTreeRegisterProvider(), spPc.first,
+				interiorFrameFactory, leafFrameFactory, typeTraits,
+				comparatorFactories, keyFields,
+				new RTreeDataflowHelperFactory());
 
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
-    }
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				rtreeSearchOp, spPc.second);
+	}
 
-    @Override
-    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
-        FileSplitDataSink fsds = (FileSplitDataSink) sink;
-        FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
-        FileSplit fs = fssi.getFileSplit();
-        File outFile = fs.getLocalFile().getFile();
-        String nodeId = fs.getNodeName();
+	@Override
+	public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(
+			IDataSink sink, int[] printColumns,
+			IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
+		FileSplitDataSink fsds = (FileSplitDataSink) sink;
+		FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
+		FileSplit fs = fssi.getFileSplit();
+		File outFile = fs.getLocalFile().getFile();
+		String nodeId = fs.getNodeName();
 
-        SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
-                metadata.getWriterFactory(), inputDesc);
-        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
-        return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
-    }
+		SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(
+				printColumns, printerFactories, outFile, metadata
+						.getWriterFactory(), inputDesc);
+		AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
+				new String[] { nodeId });
+		return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(
+				runtime, apc);
+	}
 
-    @Override
-    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
-            throws AlgebricksException {
-        AqlDataSource ads = findDataSource(dataSourceId);
-        AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
-        if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AlgebricksException("No index for external dataset " + dataSourceId);
-        }
+	@Override
+	public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(
+			String indexId, AqlSourceId dataSourceId)
+			throws AlgebricksException {
+		AqlDataSource ads = findDataSource(dataSourceId);
+		AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
+		if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+			throw new AlgebricksException("No index for external dataset "
+					+ dataSourceId);
+		}
 
-        String idxName = (String) indexId;
-        AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(adecl, idxName);
-        AqlSourceId asid = (AqlSourceId) dataSourceId;
-        if (acid != null) {
-            return new AqlIndex(acid, metadata, asid.getDatasetName());
-        } else {
-            AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
-            if (primIdx.getIndexName().equals(indexId)) {
-                return new AqlIndex(primIdx, metadata, asid.getDatasetName());
-            } else {
-                return null;
-            }
-        }
-    }
+		String idxName = (String) indexId;
+		AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(
+				adecl, idxName);
+		AqlSourceId asid = (AqlSourceId) dataSourceId;
+		if (acid != null) {
+			return new AqlIndex(acid, metadata, asid.getDatasetName());
+		} else {
+			AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
+			if (primIdx.getIndexName().equals(indexId)) {
+				return new AqlIndex(primIdx, metadata, asid.getDatasetName());
+			} else {
+				return null;
+			}
+		}
+	}
 
-    public static AqlDataSource lookupSourceInMetadata(AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
-            throws AlgebricksException {
-        if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
-            return null;
-        }
-        AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId.getDatasetName());
-        if (acdd == null) {
-            throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
-        }
-        String tName = acdd.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(tName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        return new AqlDataSource(aqlId, acdd, itemType);
-    }
+	public static AqlDataSource lookupSourceInMetadata(
+			AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
+			throws AlgebricksException {
+		if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
+			return null;
+		}
+		AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId
+				.getDatasetName());
+		if (acdd == null) {
+			throw new AlgebricksException("Datasource with id " + aqlId
+					+ " was not found.");
+		}
+		String tName = acdd.getItemTypeName();
+		IAType itemType;
+		try {
+			itemType = metadata.findType(tName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
+		return new AqlDataSource(aqlId, acdd, itemType);
+	}
 
-    @Override
-    public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
-        AqlSourceId asid = dataSource.getId();
-        String datasetName = asid.getDatasetName();
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
-        if (adecl == null) {
-            throw new IllegalArgumentException("Unknown dataset " + datasetName);
-        }
-        return adecl.getDatasetType() == DatasetType.EXTERNAL;
-    }
+	@Override
+	public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
+		AqlSourceId asid = dataSource.getId();
+		String datasetName = asid.getDatasetName();
+		AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
+		if (adecl == null) {
+			throw new IllegalArgumentException("Unknown dataset " + datasetName);
+		}
+		return adecl.getDatasetType() == DatasetType.EXTERNAL;
+	}
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        String datasetName = dataSource.getId().getDatasetName();
-        int numKeys = keys.size();
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1];
-        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+	@Override
+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
+			IDataSource<AqlSourceId> dataSource,
+			IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+			LogicalVariable payload, JobGenContext context,
+			JobSpecification spec) throws AlgebricksException {
+		String datasetName = dataSource.getId().getDatasetName();
+		int numKeys = keys.size();
+		// move key fields to front
+		int[] fieldPermutation = new int[numKeys + 1];
+		// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+		int i = 0;
+		for (LogicalVariable varKey : keys) {
+			int idx = propagatedSchema.findVariable(varKey);
+			fieldPermutation[i] = idx;
+			i++;
+		}
+		fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+				.findDataset(datasetName);
+		if (compiledDatasetDecl == null) {
+			throw new AlgebricksException("Unknown dataset " + datasetName);
+		}
+		String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
+				.getIndexName();
 
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+		ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
+				compiledDatasetDecl, metadata);
 
-        ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
-        ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+		ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+		ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+				.getAppContext();
 
-        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+		IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
+				.computeKeysBinaryComparatorFactories(compiledDatasetDecl,
+						context.getBinaryComparatorFactoryProvider());
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+		try {
+			splitsAndConstraint = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							datasetName, indexName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
-                splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
-                fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
-    }
+		TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
+				spec, appContext.getStorageManagerInterface(), appContext
+						.getTreeRegisterProvider(), splitsAndConstraint.first,
+				interiorFrameFactory, leafFrameFactory, typeTraits,
+				comparatorFactories, fieldPermutation,
+				GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
+				new BTreeDataflowHelperFactory());
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				btreeBulkLoad, splitsAndConstraint.second);
+	}
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
-        String datasetName = dataSource.getId().getDatasetName();
-        int numKeys = keys.size();
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1];
-        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+	@Override
+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
+			IDataSource<AqlSourceId> dataSource,
+			IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+			LogicalVariable payload, RecordDescriptor recordDesc,
+			JobGenContext context, JobSpecification spec)
+			throws AlgebricksException {
+		String datasetName = dataSource.getId().getDatasetName();
+		int numKeys = keys.size();
+		// move key fields to front
+		int[] fieldPermutation = new int[numKeys + 1];
+		// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+		int i = 0;
+		for (LogicalVariable varKey : keys) {
+			int idx = propagatedSchema.findVariable(varKey);
+			fieldPermutation[i] = idx;
+			i++;
+		}
+		fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+				.findDataset(datasetName);
+		if (compiledDatasetDecl == null) {
+			throw new AlgebricksException("Unknown dataset " + datasetName);
+		}
+		String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
+				.getIndexName();
 
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+		ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
+				compiledDatasetDecl, metadata);
 
-        ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
-        ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+		ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+		ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+				.getAppContext();
 
-        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+		IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
+				.computeKeysBinaryComparatorFactories(compiledDatasetDecl,
+						context.getBinaryComparatorFactoryProvider());
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+		try {
+			splitsAndConstraint = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							datasetName, indexName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
-                splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
-                new BTreeDataflowHelperFactory(), fieldPermutation, IndexOp.INSERT, txnId);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
-    }
+		TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+				spec, recordDesc, appContext.getStorageManagerInterface(),
+				appContext.getTreeRegisterProvider(),
+				splitsAndConstraint.first, interiorFrameFactory,
+				leafFrameFactory, typeTraits, comparatorFactories,
+				new BTreeDataflowHelperFactory(), fieldPermutation,
+				IndexOp.INSERT, txnId);
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				btreeBulkLoad, splitsAndConstraint.second);
+	}
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
-        String datasetName = dataSource.getId().getDatasetName();
-        int numKeys = keys.size();
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1];
-        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+	@Override
+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
+			IDataSource<AqlSourceId> dataSource,
+			IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+			LogicalVariable payload, RecordDescriptor recordDesc,
+			JobGenContext context, JobSpecification spec)
+			throws AlgebricksException {
+		String datasetName = dataSource.getId().getDatasetName();
+		int numKeys = keys.size();
+		// move key fields to front
+		int[] fieldPermutation = new int[numKeys + 1];
+		// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+		int i = 0;
+		for (LogicalVariable varKey : keys) {
+			int idx = propagatedSchema.findVariable(varKey);
+			fieldPermutation[i] = idx;
+			i++;
+		}
+		fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+				.findDataset(datasetName);
+		if (compiledDatasetDecl == null) {
+			throw new AlgebricksException("Unknown dataset " + datasetName);
+		}
+		String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
+				.getIndexName();
 
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+		ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
+				compiledDatasetDecl, metadata);
 
-        ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
-        ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+		ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+		ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+				.getAppContext();
 
-        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+		IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
+				.computeKeysBinaryComparatorFactories(compiledDatasetDecl,
+						context.getBinaryComparatorFactoryProvider());
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+		try {
+			splitsAndConstraint = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							datasetName, indexName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
 
-        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
-                splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
-                new BTreeDataflowHelperFactory(), fieldPermutation, IndexOp.DELETE, txnId);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
-    }
+		TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+				spec, recordDesc, appContext.getStorageManagerInterface(),
+				appContext.getTreeRegisterProvider(),
+				splitsAndConstraint.first, interiorFrameFactory,
+				leafFrameFactory, typeTraits, comparatorFactories,
+				new BTreeDataflowHelperFactory(), fieldPermutation,
+				IndexOp.DELETE, txnId);
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				btreeBulkLoad, splitsAndConstraint.second);
+	}
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        String indexName = dataSourceIndex.getId();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+	@Override
+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+			IOperatorSchema propagatedSchema,
+			List<LogicalVariable> primaryKeys,
+			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+			JobGenContext context, JobSpecification spec)
+			throws AlgebricksException {
+		String indexName = dataSourceIndex.getId();
+		String datasetName = dataSourceIndex.getDataSource().getId()
+				.getDatasetName();
+		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+				.findDataset(datasetName);
+		if (compiledDatasetDecl == null) {
+			throw new AlgebricksException("Unknown dataset " + datasetName);
+		}
+		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+				compiledDatasetDecl, indexName);
 
-        if (cid.getKind() == IndexKind.BTREE)
-            return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
-                    context, spec, IndexOp.INSERT);
-        else
-            return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
-                    context, spec, IndexOp.INSERT);
-    }
+		if (cid.getKind() == IndexKind.BTREE)
+			return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					IndexOp.INSERT);
+		else
+			return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					IndexOp.INSERT);
+	}
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        String indexName = dataSourceIndex.getId();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
-        if (cid.getKind() == IndexKind.BTREE)
-            return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
-                    context, spec, IndexOp.DELETE);
-        else
-            return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
-                    context, spec, IndexOp.DELETE);
-    }
+	@Override
+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+			IOperatorSchema propagatedSchema,
+			List<LogicalVariable> primaryKeys,
+			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+			JobGenContext context, JobSpecification spec)
+			throws AlgebricksException {
+		String indexName = dataSourceIndex.getId();
+		String datasetName = dataSourceIndex.getDataSource().getId()
+				.getDatasetName();
+		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+				.findDataset(datasetName);
+		if (compiledDatasetDecl == null) {
+			throw new AlgebricksException("Unknown dataset " + datasetName);
+		}
+		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+				compiledDatasetDecl, indexName);
+		if (cid.getKind() == IndexKind.BTREE)
+			return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					IndexOp.DELETE);
+		else
+			return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					IndexOp.DELETE);
+	}
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String datasetName,
-            String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
-        int numKeys = primaryKeys.size() + secondaryKeys.size();
-        // generate field permutations
-        int[] fieldPermutation = new int[numKeys];
-        int i = 0;
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
+	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(
+			String datasetName, String indexName,
+			IOperatorSchema propagatedSchema,
+			List<LogicalVariable> primaryKeys,
+			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+			JobGenContext context, JobSpecification spec, IndexOp indexOp)
+			throws AlgebricksException {
+		int numKeys = primaryKeys.size() + secondaryKeys.size();
+		// generate field permutations
+		int[] fieldPermutation = new int[numKeys];
+		int i = 0;
+		for (LogicalVariable varKey : secondaryKeys) {
+			int idx = propagatedSchema.findVariable(varKey);
+			fieldPermutation[i] = idx;
+			i++;
+		}
+		for (LogicalVariable varKey : primaryKeys) {
+			int idx = propagatedSchema.findVariable(varKey);
+			fieldPermutation[i] = idx;
+			i++;
+		}
 
-        // dataset
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        String itemTypeName = compiledDatasetDecl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Only record types can be indexed.");
-        }
-        ARecordType recType = (ARecordType) itemType;
+		// dataset
+		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+				.findDataset(datasetName);
+		if (compiledDatasetDecl == null) {
+			throw new AlgebricksException("Unknown dataset " + datasetName);
+		}
+		String itemTypeName = compiledDatasetDecl.getItemTypeName();
+		IAType itemType;
+		try {
+			itemType = metadata.findType(itemTypeName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
+		if (itemType.getTypeTag() != ATypeTag.RECORD) {
+			throw new AlgebricksException("Only record types can be indexed.");
+		}
+		ARecordType recType = (ARecordType) itemType;
 
-        // index parameters
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
-        List<String> secondaryKeyExprs = cid.getFieldExprs();
-        ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
-        for (i = 0; i < secondaryKeys.size(); ++i) {
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyExprs.get(i).toString(), recType);
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    OrderKind.ASC);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl)) {
-            IAType keyType = evalFactoryAndType.third;
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    OrderKind.ASC);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
+		// index parameters
+		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+				compiledDatasetDecl, indexName);
+		List<String> secondaryKeyExprs = cid.getFieldExprs();
+		ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+		for (i = 0; i < secondaryKeys.size(); ++i) {
+			IAType keyType = AqlCompiledIndexDecl.keyFieldType(
+					secondaryKeyExprs.get(i).toString(), recType);
+			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+					.getBinaryComparatorFactory(keyType, OrderKind.ASC);
+			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+		}
+		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+				.getPartitioningFunctions(compiledDatasetDecl)) {
+			IAType keyType = evalFactoryAndType.third;
+			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+					.getBinaryComparatorFactory(keyType, OrderKind.ASC);
+			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+			++i;
+		}
 
-        ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
-        ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
-                splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
-                new BTreeDataflowHelperFactory(), fieldPermutation, indexOp, txnId);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
-    }
+		ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+		ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+				.getAppContext();
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+		try {
+			splitsAndConstraint = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							datasetName, indexName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
+		TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+				spec, recordDesc, appContext.getStorageManagerInterface(),
+				appContext.getTreeRegisterProvider(),
+				splitsAndConstraint.first, interiorFrameFactory,
+				leafFrameFactory, typeTraits, comparatorFactories,
+				new BTreeDataflowHelperFactory(), fieldPermutation, indexOp,
+				txnId);
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				btreeBulkLoad, splitsAndConstraint.second);
+	}
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String datasetName,
-            String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        String itemTypeName = compiledDatasetDecl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Only record types can be indexed.");
-        }
-        ARecordType recType = (ARecordType) itemType;
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
-        List<String> secondaryKeyExprs = cid.getFieldExprs();
-        IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyExprs.get(0).toString(), recType);
-        int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        int numSecondaryKeys = dimension * 2;
-        int numPrimaryKeys = primaryKeys.size();
-        int numKeys = numSecondaryKeys + numPrimaryKeys;
-        ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
-        int[] fieldPermutation = new int[numKeys];
-        int i = 0;
+	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(
+			String datasetName, String indexName,
+			IOperatorSchema propagatedSchema,
+			List<LogicalVariable> primaryKeys,
+			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+			JobGenContext context, JobSpecification spec, IndexOp indexOp)
+			throws AlgebricksException {
+		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+				.findDataset(datasetName);
+		String itemTypeName = compiledDatasetDecl.getItemTypeName();
+		IAType itemType;
+		try {
+			itemType = metadata.findType(itemTypeName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
+		if (itemType.getTypeTag() != ATypeTag.RECORD) {
+			throw new AlgebricksException("Only record types can be indexed.");
+		}
+		ARecordType recType = (ARecordType) itemType;
+		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+				compiledDatasetDecl, indexName);
+		List<String> secondaryKeyExprs = cid.getFieldExprs();
+		IAType spatialType = AqlCompiledIndexDecl.keyFieldType(
+				secondaryKeyExprs.get(0).toString(), recType);
+		int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType
+				.getTypeTag());
+		int numSecondaryKeys = dimension * 2;
+		int numPrimaryKeys = primaryKeys.size();
+		int numKeys = numSecondaryKeys + numPrimaryKeys;
+		ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+		int[] fieldPermutation = new int[numKeys];
+		int i = 0;
 
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
-        for (i = 0; i < numSecondaryKeys; i++) {
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    nestedKeyType, OrderKind.ASC);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-        }
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl)) {
-            IAType keyType = evalFactoryAndType.third;
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    OrderKind.ASC);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
+		for (LogicalVariable varKey : secondaryKeys) {
+			int idx = propagatedSchema.findVariable(varKey);
+			fieldPermutation[i] = idx;
+			i++;
+		}
+		for (LogicalVariable varKey : primaryKeys) {
+			int idx = propagatedSchema.findVariable(varKey);
+			fieldPermutation[i] = idx;
+			i++;
+		}
+		IAType nestedKeyType = NonTaggedFormatUtil
+				.getNestedSpatialType(spatialType.getTypeTag());
+		IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
+		for (i = 0; i < numSecondaryKeys; i++) {
+			ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+					.getSerializerDeserializer(nestedKeyType);
+			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+					.getBinaryComparatorFactory(nestedKeyType, OrderKind.ASC);
+			typeTraits[i] = AqlTypeTraitProvider.INSTANCE
+					.getTypeTrait(nestedKeyType);
+			valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+		}
+		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+				.getPartitioningFunctions(compiledDatasetDecl)) {
+			IAType keyType = evalFactoryAndType.third;
+			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+					.getBinaryComparatorFactory(keyType, OrderKind.ASC);
+			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+			++i;
+		}
 
-        ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
-                new RTreeTypeAwareTupleWriterFactory(typeTraits), valueProviderFactories);
-        ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(new RTreeTypeAwareTupleWriterFactory(
-                typeTraits), valueProviderFactories);
+		ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
+				new RTreeTypeAwareTupleWriterFactory(typeTraits),
+				valueProviderFactories);
+		ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(
+				new RTreeTypeAwareTupleWriterFactory(typeTraits),
+				valueProviderFactories);
 
-        /*
-        ITreeIndexFrameFactory interiorFrameFactory = JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
-                numSecondaryKeys);
-        ITreeIndexFrameFactory leafFrameFactory = JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
-                numSecondaryKeys);
-        */
+		/*
+		 * ITreeIndexFrameFactory interiorFrameFactory =
+		 * JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
+		 * numSecondaryKeys); ITreeIndexFrameFactory leafFrameFactory =
+		 * JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
+		 * numSecondaryKeys);
+		 */
 
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
-                splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
-                new RTreeDataflowHelperFactory(), fieldPermutation, indexOp, txnId);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
-    }
+		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+				.getAppContext();
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+		try {
+			splitsAndConstraint = metadata
+					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+							datasetName, indexName);
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
+		TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+				spec, recordDesc, appContext.getStorageManagerInterface(),
+				appContext.getTreeRegisterProvider(),
+				splitsAndConstraint.first, interiorFrameFactory,
+				leafFrameFactory, typeTraits, comparatorFactories,
+				new RTreeDataflowHelperFactory(), fieldPermutation, indexOp,
+				txnId);
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+				rtreeUpdate, splitsAndConstraint.second);
+	}
 
-    public long getTxnId() {
-        return txnId;
-    }
+	public long getTxnId() {
+		return txnId;
+	}
 
-    public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
-        return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
-    }
+	public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(
+			ITypeTraits[] typeTraits) {
+		return new BTreeNSMInteriorFrameFactory(
+				new TypeAwareTupleWriterFactory(typeTraits));
+	}
 
-    public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(ITypeTraits[] typeTraits) {
-        return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
-    }
+	public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(
+			ITypeTraits[] typeTraits) {
+		return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+				typeTraits));
+	}
 
-    @Override
-    public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
-        return AsterixBuiltinFunctions.lookupFunction(fid);
-    }
+	@Override
+	public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+		return AsterixBuiltinFunctions.lookupFunction(fid);
+	}
 
 }
