Merged in master
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index dde4443..e0ee1f0 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 
 public abstract class AbstractCompilerFactoryBuilder {
 
@@ -50,6 +51,7 @@
     protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
     protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
     protected IPrinterFactoryProvider printerProvider;
+    protected IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
     protected IExpressionRuntimeProvider expressionRuntimeProvider;
     protected IExpressionTypeComputer expressionTypeComputer;
     protected INullableTypeComputer nullableTypeComputer;
@@ -111,6 +113,14 @@
     public IBinaryComparatorFactoryProvider getComparatorFactoryProvider() {
         return comparatorFactoryProvider;
     }
+    
+    public void setPredicateEvaluatorFactoryProvider(IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider) {
+        this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
+    }
+
+    public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactory() {
+        return predEvaluatorFactoryProvider;
+    }
 
     public void setBinaryBooleanInspectorFactory(IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
         this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 2b24bd0..1c62307 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -91,7 +91,7 @@
                                 binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
                                 nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
                                 expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
-                                partialAggregationTypeComputer, frameSize, clusterLocations);
+                                partialAggregationTypeComputer, predEvaluatorFactoryProvider, frameSize, clusterLocations);
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, null, jobEventListenerFactory);
                     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 6da42b4..d745abd 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -40,6 +40,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -106,6 +108,10 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
+        
+        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory predEvaluatorFactory = ( predEvaluatorFactoryProvider == null ? null : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+        
         RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
                 propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -125,7 +131,7 @@
                     case INNER: {
                         opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                                hashFunFactories, comparatorFactories, recDescriptor);
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory);
                         break;
                     }
                     case LEFT_OUTER: {
@@ -135,7 +141,7 @@
                         }
                         opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                                hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true, nullWriterFactories);
                         break;
                     }
                     default: {
@@ -153,7 +159,7 @@
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
                                 comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
                                         keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft));
+                                        keysRight, keysLeft), predEvaluatorFactory);
                         break;
                     }
                     case LEFT_OUTER: {
@@ -165,7 +171,7 @@
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
                                 comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
                                         keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft), true, nullWriterFactories);
+                                        keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
                         break;
                     }
                     default: {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index b09c194..2c59151 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -37,6 +37,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
@@ -86,6 +88,10 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
+        
+        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory predEvaluatorFactory = ( predEvaluatorFactoryProvider == null ? null : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+        
         RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
                 propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -94,7 +100,7 @@
         switch (kind) {
             case INNER: {
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, tableSize);
+                        comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory);
                 break;
             }
             case LEFT_OUTER: {
@@ -103,7 +109,7 @@
                     nullWriterFactories[j] = context.getNullWriterFactory();
                 }
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, true, nullWriterFactories, tableSize);
+                        comparatorFactories, predEvaluatorFactory, recDescriptor, true, nullWriterFactories, tableSize);
                 break;
             }
             default: {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 365d1a5..245e5e1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -41,6 +41,8 @@
 import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 
 public class JobGenContext {
 	private final IOperatorSchema outerFlowSchema;
@@ -61,6 +63,7 @@
 	private final IExpressionTypeComputer expressionTypeComputer;
 	private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
 	private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+	private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
 	private final int frameSize;
 	private AlgebricksPartitionConstraint clusterLocations;
 	private int varCounter;
@@ -86,7 +89,7 @@
 			ITypingContext typingContext,
 			IExpressionEvalSizeComputer expressionEvalSizeComputer,
 			IPartialAggregationTypeComputer partialAggregationTypeComputer,
-			int frameSize, AlgebricksPartitionConstraint clusterLocations) {
+			IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize, AlgebricksPartitionConstraint clusterLocations) {
 		this.outerFlowSchema = outerFlowSchema;
 		this.metadataProvider = metadataProvider;
 		this.appContext = appContext;
@@ -106,6 +109,7 @@
 		this.typingContext = typingContext;
 		this.expressionEvalSizeComputer = expressionEvalSizeComputer;
 		this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+		this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
 		this.frameSize = frameSize;
 		this.varCounter = 0;
 	}
@@ -157,6 +161,10 @@
 	public IPrinterFactoryProvider getPrinterFactoryProvider() {
 		return printerFactoryProvider;
 	}
+	
+	public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider(){
+		return predEvaluatorFactoryProvider;
+	}
 
 	public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
 		return expressionRuntimeProvider;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index c4b7802..733c7d3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -27,11 +27,9 @@
 public interface ICCApplicationContext extends IApplicationContext {
     /**
      * Sets the state that must be distributed by the infrastructure to all the
-     * NC application contexts. Any state set by calling this method in
-     * the {@link ICCApplicationEntryPoint#start(ICCApplicationContext, String[])} call
-     * is made available to all the {@link INCApplicationContext} objects at each Node Controller.
-     * The state is then available to be inspected by the application at the NC during or
-     * after the {@link INCBootstrap#start()} call.
+     * NC application contexts. Any state set by calling this method in the {@link ICCApplicationEntryPoint#start(ICCApplicationContext, String[])} call is made available to all the {@link INCApplicationContext} objects
+     * at each Node Controller. The state is then available to be inspected by
+     * the application at the NC during or after the {@link INCBootstrap#start()} call.
      * 
      * @param state
      *            The distributed state
@@ -47,6 +45,14 @@
     public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
 
     /**
+     * A listener that listens to Cluster Lifecycle events at the Cluster
+     * Controller.
+     * 
+     * @param jobLifecycleListener
+     */
+    public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
+
+    /**
      * Get the Cluster Controller Context.
      * 
      * @return The Cluster Controller Context.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
new file mode 100644
index 0000000..51db13e
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A listener interface for providing notification call backs to events such as a Node Controller joining/leaving the cluster.
+ */
+public interface IClusterLifecycleListener {
+
+    /**
+     * @param nodeId
+     *            A unique identifier of a Node Controller
+     * @param ncConfig
+     *            A map containing the set of configuration parameters that were used to start the Node Controller
+     */
+    public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration);
+
+    /**
+     * @param deadNodeIds
+     *            A set of Node Controller Ids that have left the cluster. The set is not cumulative.
+     */
+    public void notifyNodeFailure(Set<String> deadNodeIds);
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
new file mode 100644
index 0000000..575472c
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+/*
+ * Compares two tuples to make sure that records, whose comparison keys are NULL do not pass comparator filter  
+ */
+public interface IPredicateEvaluator {
+	public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
new file mode 100644
index 0000000..bc2f339
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+/*
+ * Provides PredicateEvaluator for equi-join related operators 
+ */
+
+public interface IPredicateEvaluatorFactory extends Serializable {
+	public IPredicateEvaluator createPredicateEvaluator();
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
new file mode 100644
index 0000000..029ab21
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+/*
+ * Provides PredicateEvaluatorFactory based on (equi-join) keys 		
+ */
+
+public interface IPredicateEvaluatorFactoryProvider extends Serializable{
+	public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index dc99ef3..563ee1b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.dataset;
 
 import java.util.HashMap;
+import java.util.List;
 
 public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
     public enum Status {
@@ -27,6 +28,8 @@
 
     private Status status;
 
+    private List<Exception> exceptions;
+
     public DatasetJobRecord() {
         this.status = Status.RUNNING;
     }
@@ -43,7 +46,16 @@
         status = Status.FAILED;
     }
 
+    public void fail(List<Exception> exceptions) {
+        status = Status.FAILED;
+        this.exceptions = exceptions;
+    }
+
     public Status getStatus() {
         return status;
     }
+
+    public List<Exception> getExceptions() {
+        return exceptions;
+    }
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 52e6005..d152cf5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -28,7 +30,7 @@
 
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
 
-    public void reportJobFailure(JobId jobId);
+    public void reportJobFailure(JobId jobId, List<Exception> exceptions);
 
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
 
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 33e1b01..97d3744 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -106,12 +106,8 @@
                     lastMonitor = getMonitor(lastReadPartition);
                     resultChannel.open(datasetClientCtx);
                     resultChannel.registerMonitor(lastMonitor);
-                } catch (HyracksException e) {
-                    throw new HyracksDataException(e);
-                } catch (UnknownHostException e) {
-                    throw new HyracksDataException(e);
                 } catch (Exception e) {
-                    // Do nothing here.
+                    throw new HyracksDataException(e);
                 }
             }
         }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5976760..5294428 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -19,6 +19,7 @@
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
@@ -117,6 +118,8 @@
 
     private final Map<JobId, JobRun> runMapArchive;
 
+    private final Map<JobId, List<Exception>> runMapHistory;
+
     private final WorkQueue workQueue;
 
     private final ExecutorService executor;
@@ -156,6 +159,15 @@
                 return size() > ccConfig.jobHistorySize;
             }
         };
+        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+            private static final long serialVersionUID = 1L;
+            /** history size + 1 is for the case when history size = 0 */
+            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+
+            protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
+                return size() > allowedSize;
+            }
+        };
         workQueue = new WorkQueue();
         this.timer = new Timer(true);
         final ClusterTopology topology = computeClusterTopology(ccConfig);
@@ -252,6 +264,10 @@
         return runMapArchive;
     }
 
+    public Map<JobId, List<Exception>> getRunHistory() {
+        return runMapHistory;
+    }
+
     public Map<String, Set<String>> getIpAddressNodeNameMap() {
         return ipAddressNodeNameMap;
     }
@@ -554,4 +570,4 @@
     public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
         deploymentRunMap.remove(deploymentKey);
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 7e1581a..f884c6b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -19,9 +19,11 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -41,6 +43,7 @@
     protected IResultCallback<Object> deinitializationCallback;
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
+    private List<IClusterLifecycleListener> clusterLifecycleListeners;
 
     public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext) throws IOException {
         super(serverCtx);
@@ -48,6 +51,7 @@
         initPendingNodeIds = new HashSet<String>();
         deinitPendingNodeIds = new HashSet<String>();
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
+        clusterLifecycleListeners = new ArrayList<IClusterLifecycleListener>();
     }
 
     public ICCContext getCCContext() {
@@ -82,4 +86,21 @@
             l.notifyJobCreation(jobId, acggf);
         }
     }
+
+    @Override
+    public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener) {
+        clusterLifecycleListeners.add(clusterLifecycleListener);
+    }
+
+    public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException {
+        for (IClusterLifecycleListener l : clusterLifecycleListeners) {
+            l.notifyNodeJoin(nodeId, ncConfiguration);
+        }
+    }
+
+    public void notifyNodeFailure(Set<String> deadNodeIds) {
+        for (IClusterLifecycleListener l : clusterLifecycleListeners) {
+            l.notifyNodeFailure(deadNodeIds);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 21b05d4..36082b0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -16,6 +16,7 @@
 
 import java.util.Arrays;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -51,7 +52,8 @@
     }
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+            throws HyracksException {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
         if (djr == null) {
             djr = new DatasetJobRecord();
@@ -119,10 +121,10 @@
     }
 
     @Override
-    public synchronized void reportJobFailure(JobId jobId) {
+    public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
         if (djr != null) {
-            djr.fail();
+            djr.fail(exceptions);
         }
         notifyAll();
     }
@@ -196,7 +198,12 @@
         }
 
         if (djr.getStatus() == Status.FAILED) {
-            throw new HyracksDataException("Job failed.");
+            List<Exception> caughtExceptions = djr.getExceptions();
+            if (caughtExceptions == null) {
+                throw new HyracksDataException("Job failed.");
+            } else {
+                throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1));
+            }
         }
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 7954c7c..ab218cc 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -81,6 +81,7 @@
             run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
             ccs.getActiveRunMap().remove(jobId);
             ccs.getRunMapArchive().put(jobId, run);
+            ccs.getRunHistory().put(jobId, run.getExceptions());
             try {
                 ccs.getJobLogFile().log(createJobLogObject(run));
             } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 65e1519..63e62a0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -70,6 +70,7 @@
             run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
             ccs.getActiveRunMap().remove(jobId);
             ccs.getRunMapArchive().put(jobId, run);
+            ccs.getRunHistory().put(jobId, run.getExceptions());
             try {
                 ccs.getJobLogFile().log(createJobLogObject(run));
             } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index f7dd1d2..03d43ed 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -47,6 +48,7 @@
 
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
         CCNCFunctions.NodeRegistrationResult result = null;
+        Map<String, String> ncConfiguration = null;
         try {
             INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
 
@@ -58,6 +60,8 @@
             nodeMap.put(id, state);
             Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
             String ipAddress = state.getNCConfig().dataIPAddress;
+            ncConfiguration = new HashMap<String, String>();
+            state.getNCConfig().toMap(ncConfiguration);
             Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
             if (nodes == null) {
                 nodes = new HashSet<String>();
@@ -75,5 +79,6 @@
             result = new CCNCFunctions.NodeRegistrationResult(null, e);
         }
         ncIPCHandle.send(-1, result, null);
+        ccs.getApplicationContext().notifyNodeJoin(id, ncConfiguration);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 7255503..c82e264 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -65,6 +65,9 @@
                 }
             }
         }
+        if (deadNodes != null && deadNodes.size() > 0) {
+            ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 51eb671..6a04487 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -35,7 +35,7 @@
     @Override
     protected void performEvent(TaskAttempt ta) {
         JobRun run = ccs.getActiveRunMap().get(jobId);
-        ccs.getDatasetDirectoryService().reportJobFailure(jobId);
+        ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
         run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 6cfe025..8efea17 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
@@ -48,17 +50,31 @@
             });
         } else {
             final IJobStatusConditionVariable cArchivedVar = ccs.getRunMapArchive().get(jobId);
-            ccs.getExecutor().execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        cArchivedVar.waitForCompletion();
-                        callback.setValue(null);
-                    } catch (Exception e) {
-                        callback.setException(e);
+            if (cArchivedVar != null) {
+                ccs.getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            cArchivedVar.waitForCompletion();
+                            callback.setValue(null);
+                        } catch (Exception e) {
+                            callback.setException(e);
+                        }
                     }
-                }
-            });
+                });
+            } else {
+                final List<Exception> exceptions = ccs.getRunHistory().get(jobId);
+                ccs.getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        callback.setValue(null);
+                        if (exceptions != null && exceptions.size() > 0) {
+                            /** only report the first exception because IResultCallback will only throw one exception anyway */
+                            callback.setException(exceptions.get(0));
+                        }
+                    }
+                });
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index ec29592..633d4fb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -16,86 +16,107 @@
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 
 import org.kohsuke.args4j.Argument;
 import org.kohsuke.args4j.Option;
 import org.kohsuke.args4j.spi.StopOptionHandler;
 
 public class NCConfig implements Serializable {
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
-    public String ccHost;
+	@Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
+	public String ccHost;
 
-    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
-    public int ccPort = 1099;
+	@Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+	public int ccPort = 1099;
 
-    @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
-    public String clusterNetIPAddress;
+	@Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+	public String clusterNetIPAddress;
 
-    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
-    public String nodeId;
+	@Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
+	public String nodeId;
 
-    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
-    public String dataIPAddress;
+	@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
+	public String dataIPAddress;
 
-    @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
-    public String datasetIPAddress;
+	@Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+	public String datasetIPAddress;
 
-    @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
-    public String ioDevices = System.getProperty("java.io.tmpdir");
+	@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
+	public String ioDevices = System.getProperty("java.io.tmpdir");
 
-    @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
-    public int nNetThreads = 1;
+	@Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+	public int nNetThreads = 1;
 
-    @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
-    public int maxMemory = -1;
+	@Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
+	public int maxMemory = -1;
 
-    @Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
-    public int resultHistorySize = 100;
+	@Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
+	public int resultHistorySize = 100;
 
-    @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
-    public int resultManagerMemory = -1;
+	@Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
+	public int resultManagerMemory = -1;
 
-    @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
-    public String appNCMainClass;
+	@Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
+	public String appNCMainClass;
 
-    @Argument
-    @Option(name = "--", handler = StopOptionHandler.class)
-    public List<String> appArgs;
+	@Argument
+	@Option(name = "--", handler = StopOptionHandler.class)
+	public List<String> appArgs;
 
-    public void toCommandLine(List<String> cList) {
-        cList.add("-cc-host");
-        cList.add(ccHost);
-        cList.add("-cc-port");
-        cList.add(String.valueOf(ccPort));
-        cList.add("-cluster-net-ip-address");
-        cList.add(clusterNetIPAddress);
-        cList.add("-node-id");
-        cList.add(nodeId);
-        cList.add("-data-ip-address");
-        cList.add(dataIPAddress);
-        cList.add(datasetIPAddress);
-        cList.add("-iodevices");
-        cList.add(ioDevices);
-        cList.add("-net-thread-count");
-        cList.add(String.valueOf(nNetThreads));
-        cList.add("-max-memory");
-        cList.add(String.valueOf(maxMemory));
-        cList.add("-result-history-size");
-        cList.add(String.valueOf(resultHistorySize));
-        cList.add("-result-manager-memory");
-        cList.add(String.valueOf(resultManagerMemory));
+	public void toCommandLine(List<String> cList) {
+		cList.add("-cc-host");
+		cList.add(ccHost);
+		cList.add("-cc-port");
+		cList.add(String.valueOf(ccPort));
+		cList.add("-cluster-net-ip-address");
+		cList.add(clusterNetIPAddress);
+		cList.add("-node-id");
+		cList.add(nodeId);
+		cList.add("-data-ip-address");
+		cList.add(dataIPAddress);
+		cList.add(datasetIPAddress);
+		cList.add("-iodevices");
+		cList.add(ioDevices);
+		cList.add("-net-thread-count");
+		cList.add(String.valueOf(nNetThreads));
+		cList.add("-max-memory");
+		cList.add(String.valueOf(maxMemory));
+		cList.add("-result-history-size");
+		cList.add(String.valueOf(resultHistorySize));
+		cList.add("-result-manager-memory");
+		cList.add(String.valueOf(resultManagerMemory));
 
-        if (appNCMainClass != null) {
-            cList.add("-app-nc-main-class");
-            cList.add(appNCMainClass);
-        }
-        if (appArgs != null && !appArgs.isEmpty()) {
-            cList.add("--");
-            for (String appArg : appArgs) {
-                cList.add(appArg);
-            }
-        }
-    }
-}
+		if (appNCMainClass != null) {
+			cList.add("-app-nc-main-class");
+			cList.add(appNCMainClass);
+		}
+		if (appArgs != null && !appArgs.isEmpty()) {
+			cList.add("--");
+			for (String appArg : appArgs) {
+				cList.add(appArg);
+			}
+		}
+	}
+
+	public void toMap(Map<String, String> configuration) {
+		configuration.put("cc-host", ccHost);
+		configuration.put("cc-port", (String.valueOf(ccPort)));
+		configuration.put("cluster-net-ip-address", clusterNetIPAddress);
+		configuration.put("node-id", nodeId);
+		configuration.put("data-ip-address", dataIPAddress);
+		configuration.put("iodevices", ioDevices);
+		configuration.put("net-thread-count", String.valueOf(nNetThreads));
+		configuration.put("max-memory", String.valueOf(maxMemory));
+		configuration.put("result-history-size",
+				String.valueOf(resultHistorySize));
+		configuration.put("result-manager-memory",
+				String.valueOf(resultManagerMemory));
+
+		if (appNCMainClass != null) {
+			configuration.put("app-nc-main-class", appNCMainClass);
+		}
+
+	}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 240322c..fec35ac 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -59,7 +59,8 @@
     }
 
     @Override
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception {
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
+            throws Exception {
         CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
                 exceptions);
         ipcHandle.send(-1, fn, null);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index d0e52e3..1db483f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -82,6 +82,8 @@
 
     private final List<Exception> exceptions;
 
+    private List<Throwable> caughtExceptions;
+
     private volatile boolean aborted;
 
     private NodeControllerService ncs;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index c70a1bd..d96872d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -25,6 +25,7 @@
 public class NotifyTaskFailureWork extends AbstractWork {
     private final NodeControllerService ncs;
     private final Task task;
+
     private final List<Exception> exceptions;
 
     public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileSplit.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileSplit.java
index b74d97b..35649ca 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileSplit.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileSplit.java
@@ -23,17 +23,25 @@
     private static final long serialVersionUID = 1L;
 
     private final String nodeName;
-
     private final FileReference file;
+    private final int ioDeviceId;
 
     public FileSplit(String nodeName, FileReference file) {
         this.nodeName = nodeName;
         this.file = file;
+        this.ioDeviceId = 0;
+    }
+
+    public FileSplit(String nodeName, FileReference file, int ioDeviceId) {
+        this.nodeName = nodeName;
+        this.file = file;
+        this.ioDeviceId = ioDeviceId;
     }
 
     public FileSplit(String nodeName, String path) {
         this.nodeName = nodeName;
         this.file = new FileReference(new File(path));
+        this.ioDeviceId = 0;
     }
 
     public String getNodeName() {
@@ -43,4 +51,8 @@
     public FileReference getLocalFile() {
         return file;
     }
+
+    public int getIODeviceId() {
+        return ioDeviceId;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 6c58d6f..0bb514e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -22,6 +22,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -42,13 +44,14 @@
     private final double factor;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
     public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) {
+            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -58,6 +61,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
         recordDescriptors[0] = recordDescriptor;
@@ -66,7 +70,7 @@
     public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1, IPredicateEvaluatorFactory predEvalFactory) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -76,6 +80,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
@@ -143,12 +148,13 @@
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(rpartAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(spartAid, 0);
             int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator() );
+            
             return new GraceHashJoinOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
                     RPARTITION_ACTIVITY_ID), partition), new TaskId(new ActivityId(getOperatorId(),
                     SPARTITION_ACTIVITY_ID), partition), recordsPerFrame, factor, keys0, keys1, hashFunctionFactories,
                     comparatorFactories, nullWriterFactories1, rd1, rd0, recordDescriptors[0], numPartitions,
-                    isLeftOuter);
+                    predEvaluator, isLeftOuter);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index 91f509d..7c9bd88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -51,12 +52,13 @@
     private final double factor;
     private final int numPartitions;
     private final boolean isLeftOuter;
+    private final IPredicateEvaluator predEvaluator;
 
     GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
             IBinaryComparatorFactory[] comparatorFactories, INullWriterFactory[] nullWriterFactories,
             RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions,
-            boolean isLeftOuter) {
+            IPredicateEvaluator predEval, boolean isLeftOuter) {
         this.ctx = ctx;
         this.state0Id = state0Id;
         this.state1Id = state1Id;
@@ -70,6 +72,7 @@
         this.numPartitions = numPartitions;
         this.recordsPerFrame = recordsPerFrame;
         this.factor = factor;
+        this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
     }
 
@@ -114,7 +117,7 @@
                 table.reset();
                 InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                         ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
+                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
 
                 // build
                 if (buildWriter != null) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 54a85d2..f7be7c8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -29,6 +29,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -66,6 +68,7 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
@@ -87,7 +90,7 @@
     public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
+            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) throws HyracksDataException {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -97,6 +100,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
         recordDescriptors[0] = recordDescriptor;
@@ -105,7 +109,7 @@
     public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
+            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
             throws HyracksDataException {
         super(spec, 2, 1);
         this.memsize = memsize;
@@ -116,6 +120,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
@@ -189,6 +194,7 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -315,7 +321,7 @@
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
                             new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1, table);
+                                    comparators), isLeftOuter, nullWriters1, table, predEvaluator);
                     bufferForPartitions = new ByteBuffer[state.nPartitions];
                     state.fWriters = new RunFileWriter[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
@@ -378,6 +384,7 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
@@ -501,7 +508,7 @@
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
                                     hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                    nullWriters1, table);
+                                    nullWriters1, table, predEvaluator);
 
                             if (buildWriter != null) {
                                 RunFileReader buildReader = buildWriter.createReader();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index bc0189e..0f27aa7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -47,18 +48,19 @@
     private final int tableSize;
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
+    private final IPredicateEvaluator predEvaluator;
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
-            ISerializableTable table) throws HyracksDataException {
-        this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+            ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
+        this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval, false);
     }
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
-            ISerializableTable table, boolean reverse) throws HyracksDataException {
+            ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
         this.tableSize = tableSize;
         this.table = table;
         storedTuplePointer = new TuplePointer();
@@ -71,6 +73,7 @@
         tpComparator = comparator;
         outBuffer = ctx.allocateFrame();
         appender.reset(outBuffer, true);
+        predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
             int fieldCountOuter = accessor1.getFieldCount();
@@ -103,25 +106,28 @@
         accessorProbe.reset(buffer);
         int tupleCount0 = accessorProbe.getTupleCount();
         for (int i = 0; i < tupleCount0; ++i) {
-            int entry = tpcProbe.partition(accessorProbe, i, tableSize);
-            boolean matchFound = false;
-            int offset = 0;
-            do {
-                table.getTuplePointer(entry, offset++, storedTuplePointer);
-                if (storedTuplePointer.frameIndex < 0)
-                    break;
-                int bIndex = storedTuplePointer.frameIndex;
-                int tIndex = storedTuplePointer.tupleIndex;
-                accessorBuild.reset(buffers.get(bIndex));
-                int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
-                if (c == 0) {
-                    matchFound = true;
-                    appendToResult(i, tIndex, writer);
-                }
-            } while (true);
-
+        	boolean matchFound = false;
+        	if(tableSize != 0){
+        		int entry = tpcProbe.partition(accessorProbe, i, tableSize);
+                int offset = 0;
+                do {
+                    table.getTuplePointer(entry, offset++, storedTuplePointer);
+                    if (storedTuplePointer.frameIndex < 0)
+                        break;
+                    int bIndex = storedTuplePointer.frameIndex;
+                    int tIndex = storedTuplePointer.tupleIndex;
+                    accessorBuild.reset(buffers.get(bIndex));
+                    int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
+                    if (c == 0) {
+                    	boolean predEval = ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, i, accessorBuild, tIndex) );
+                    	if(predEval){
+                    		matchFound = true;
+                            appendToResult(i, tIndex, writer);
+                    	}
+                    }
+                } while (true);
+        	}
             if (!matchFound && isLeftOuter) {
-
                 if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
                         nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
                     flushFrame(outBuffer, writer);
@@ -132,7 +138,6 @@
                                 + appender.getBuffer().capacity() + ")");
                     }
                 }
-
             }
         }
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index e0a5613..705923a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -29,6 +29,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -53,18 +55,20 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
     private final int tableSize;
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int tableSize) {
+            RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
@@ -73,18 +77,34 @@
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
             int tableSize) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         this.tableSize = tableSize;
     }
+    
+    public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, int tableSize) {
+    	this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null);
+    }
+    
+    public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            int tableSize) {
+    	this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories,null,recordDescriptor,isLeftOuter,nullWriterFactories1,tableSize);
+    }
+    
+    
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
@@ -150,6 +170,7 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildTaskState state;
@@ -166,7 +187,7 @@
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
                             new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1, table);
+                                    comparators), isLeftOuter, nullWriters1, table, predEvaluator);
                 }
 
                 @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 4312e72..d365864 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -47,9 +48,10 @@
     private final RunFileWriter runFileWriter;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuilder;
-
+    private final IPredicateEvaluator predEvaluator;
+    
     public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
-            ITuplePairComparator comparators, int memSize, boolean isLeftOuter, INullWriter[] nullWriters1)
+            ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, INullWriter[] nullWriters1)
             throws HyracksDataException {
         this.accessorInner = accessor1;
         this.accessorOuter = accessor0;
@@ -60,6 +62,7 @@
         this.appender.reset(outBuffer, true);
         this.outBuffers = new ArrayList<ByteBuffer>();
         this.memSize = memSize;
+        this.predEvaluator = predEval;
         this.ctx = ctx;
 
         this.isLeftOuter = isLeftOuter;
@@ -130,8 +133,9 @@
             boolean matchFound = false;
             for (int j = 0; j < tupleCount1; ++j) {
                 int c = compare(accessorOuter, i, accessorInner, j);
-                if (c == 0) {
-                    matchFound = true;
+                boolean prdEval = (predEvaluator == null) || (predEvaluator.evaluate(accessorOuter, i, accessorInner, j));
+                if (c == 0 && prdEval) {
+                	matchFound = true;
                     if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
                         flushFrame(outBuffer, writer);
                         appender.reset(outBuffer, true);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 0be01c1..d3f664e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -27,6 +27,8 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -49,16 +51,24 @@
     private static final long serialVersionUID = 1L;
     private final ITuplePairComparatorFactory comparatorFactory;
     private final int memSize;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
-
+    
     public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
             boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+        this(spec, comparatorFactory, recordDescriptor, memSize, null, isLeftOuter, nullWriterFactories1);
+    }
+    
+    public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
+            IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
         super(spec, 2, 1);
         this.comparatorFactory = comparatorFactory;
         this.recordDescriptors[0] = recordDescriptor;
         this.memSize = memSize;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
     }
@@ -117,7 +127,8 @@
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-
+            final IPredicateEvaluator predEvaluator = ( (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null);
+            
             final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
                 for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -134,7 +145,7 @@
                             partition));
 
                     state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
-                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, isLeftOuter,
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, predEvaluator, isLeftOuter,
                             nullWriters1);
 
                 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index a3b81e4..175ef80 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -9,6 +9,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -53,7 +54,8 @@
 
     private RunFileWriter[] buildRFWriters; //writing spilled build partitions
     private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
+    
+    private final IPredicateEvaluator predEvaluator;
     private final boolean isLeftOuter;
     private final INullWriter[] nullWriters1;
 
@@ -75,18 +77,18 @@
     private FrameTupleAppender probeTupAppenderToSpilled;
 
     private int numOfSpilledParts;
-    private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
-    private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
-    private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning 
-
+    private ByteBuffer[] sPartBuffs;    //Buffers for probe spilled partitions (one buffer per spilled partition)
+    private ByteBuffer probeResBuff;    //Buffer for probe resident partition tuples
+    private ByteBuffer reloadBuffer;    //Buffer for reloading spilled partitions during partition tuning 
+    
     private int[] buildPSizeInFrames; //Used for partition tuning
     private int freeFramesCounter; //Used for partition tuning
-
-    private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
-
+    
+    private boolean isTableEmpty;	//Added for handling the case, where build side is empty (tableSize is 0)
+    
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc) {
+            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval) {
         this.ctx = ctx;
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
@@ -106,6 +108,7 @@
         this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
         this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
 
+        this.predEvaluator = predEval;
         this.isLeftOuter = false;
         this.nullWriters1 = null;
 
@@ -114,7 +117,7 @@
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
             RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-            boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+            IPredicateEvaluator predEval, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
         this.ctx = ctx;
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
@@ -133,7 +136,8 @@
 
         this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
         this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
-
+        
+        this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
 
         this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
@@ -423,7 +427,7 @@
         this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
                 new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
                         ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
-                        comparators), isLeftOuter, nullWriters1, table);
+                        comparators), isLeftOuter, nullWriters1, table, predEvaluator);
     }
 
     private void cacheInMemJoin() throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 13db753..1068a95 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -30,6 +30,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -111,7 +113,8 @@
     private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
     private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
     private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
-
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
+    
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
@@ -119,7 +122,7 @@
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
             ITuplePairComparatorFactory tupPaircomparatorFactory0,
-            ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter,
+            ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory, boolean isLeftOuter,
             INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
 
         super(spec, 2, 1);
@@ -133,6 +136,7 @@
         this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
         this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
         recordDescriptors[0] = recordDescriptor;
+        this.predEvaluatorFactory = predEvaluatorFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         
@@ -142,7 +146,7 @@
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
+            ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
             throws HyracksDataException {
 
         super(spec, 2, 1);
@@ -155,6 +159,7 @@
         this.comparatorFactories = comparatorFactories;
         this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
         this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
+        this.predEvaluatorFactory = predEvaluatorFactory;
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
@@ -255,6 +260,8 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
             
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            
             
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -276,12 +283,12 @@
                     if(!isLeftOuter){
                     	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
                                 PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                                buildHpc);
+                                buildHpc, predEvaluator);
                     }
                     else{
                     	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
                                 PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                                buildHpc, isLeftOuter, nullWriterFactories1);
+                                buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1);
                     }
                     
                     state.hybridHJ.initBuild();
@@ -336,7 +343,8 @@
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
             final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
-
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
@@ -372,9 +380,7 @@
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                	if(!state.hybridHJ.isTableEmpty()){
-                		state.hybridHJ.probe(buffer, writer);
-                	}
+                	state.hybridHJ.probe(buffer, writer);
                 }
 
                 @Override
@@ -459,7 +465,7 @@
                                     nPartitions);
                            
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
-                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
+                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator);
 
                             buildSideReader.open();
                             rHHj.initBuild();
@@ -521,7 +527,7 @@
                                     nPartitions);
                             
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
-                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
+                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
 
                             probeSideReader.open();
                             rHHj.initBuild();
@@ -589,7 +595,7 @@
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
                             buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
-                            isLeftOuter, nullWriters1, table, reverse);
+                            isLeftOuter, nullWriters1, table, predEvaluator, reverse);
 
                     bReader.open();
                     rPartbuff.clear();
@@ -617,7 +623,7 @@
                         throws HyracksDataException {
 
                     NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
-                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, false, null);
+                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
 
                     ByteBuffer cacheBuff = ctx.allocateFrame();
                     innerReader.open();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index 0999ee8..b7fd241 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -90,7 +90,7 @@
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -168,7 +168,7 @@
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -247,7 +247,7 @@
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index b5eb850..9b35867 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -122,7 +122,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -205,7 +205,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -288,7 +288,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -372,7 +372,7 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, true, nullWriterFactories, 128);
+                null, custOrderJoinDesc, true, nullWriterFactories, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -460,7 +460,7 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, true, nullWriterFactories);
+                custOrderJoinDesc, true, nullWriterFactories, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -548,7 +548,7 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, true, nullWriterFactories);
+                custOrderJoinDesc, null, true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -629,7 +629,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -720,7 +720,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -811,7 +811,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -898,7 +898,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -991,7 +991,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 6df9ff8..8dab4de 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -210,7 +210,7 @@
                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                             .of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc);
+                    custOrderJoinDesc, null);
 
         } else if ("hybridhash".equalsIgnoreCase(algo)) {
             join = new HybridHashJoinOperatorDescriptor(
@@ -224,7 +224,7 @@
                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                             .of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc);
+                    custOrderJoinDesc, null);
 
         } else {
             join = new InMemoryHashJoinOperatorDescriptor(
@@ -234,7 +234,7 @@
                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                             .of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc, 6000000);
+                    custOrderJoinDesc, 6000000, null);
         }
 
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
index 354aa1e..5027dc7 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
@@ -82,6 +82,16 @@
     }
 
     @Override
+    public int getMaxTupleSize(int pageSize) {
+        return (pageSize - getPageHeaderSize()) / 2;
+    }
+
+    @Override
+    public int getBytesRequriedToWriteTuple(ITupleReference tuple) {
+        return tupleWriter.bytesRequired(tuple) + slotManager.getSlotSize();
+    }
+
+    @Override
     public void setPage(ICachedPage page) {
         this.page = page;
         this.buf = page.getBuffer();
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
index 90b167f..8753d6b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
@@ -54,6 +54,11 @@
     }
 
     @Override
+    public int getBytesRequriedToWriteTuple(ITupleReference tuple) {
+        return tupleWriter.bytesRequired(tuple) + childPtrSize + slotManager.getSlotSize();
+    }
+
+    @Override
     public void initBuffer(byte level) {
         super.initBuffer(level);
         buf.putInt(rightLeafOff, -1);
@@ -185,7 +190,7 @@
         ITreeIndexFrame targetFrame = null;
 
         int totalSize = 0;
-        int halfPageSize = buf.capacity() / 2 - getPageHeaderSize();
+        int halfPageSize = (buf.capacity() - getPageHeaderSize()) / 2;
         int i;
         for (i = 0; i < tupleCount; ++i) {
             frameTuple.resetByTupleIndex(this, i);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
index 04b3077..bfb1eca 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
@@ -46,6 +46,11 @@
     }
 
     @Override
+    public int getBytesRequriedToWriteTuple(ITupleReference tuple) {
+        return tupleWriter.bytesRequired(tuple) + slotManager.getSlotSize();
+    }
+    
+    @Override
     public void initBuffer(byte level) {
         super.initBuffer(level);
         buf.putInt(nextLeafOff, -1);
@@ -146,7 +151,7 @@
         int tuplesToLeft;
         ITreeIndexFrame targetFrame = null;
         int totalSize = 0;
-        int halfPageSize = buf.capacity() / 2 - getPageHeaderSize();
+        int halfPageSize = (buf.capacity() - getPageHeaderSize()) / 2;
         int i;
         for (i = 0; i < tupleCount; ++i) {
             frameTuple.resetByTupleIndex(this, i);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 86bc32a..8846873 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
@@ -74,6 +75,7 @@
 
     private final AtomicInteger smoCounter;
     private final ReadWriteLock treeLatch;
+    private final int maxTupleSize;
 
     public BTree(IBufferCache bufferCache, IFileMapProvider fileMapProvider, IFreePageManager freePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory,
@@ -82,6 +84,10 @@
                 fieldCount, file);
         this.treeLatch = new ReentrantReadWriteLock(true);
         this.smoCounter = new AtomicInteger();
+        ITreeIndexFrame leafFrame = leafFrameFactory.createFrame();
+        ITreeIndexFrame interiorFrame = interiorFrameFactory.createFrame();
+        maxTupleSize = Math.min(leafFrame.getMaxTupleSize(bufferCache.getPageSize()),
+                interiorFrame.getMaxTupleSize(bufferCache.getPageSize()));
     }
 
     private void diskOrderScan(ITreeIndexCursor icursor, BTreeOpContext ctx) throws HyracksDataException {
@@ -304,11 +310,23 @@
     }
 
     private void insert(ITupleReference tuple, BTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
+        int tupleSize = Math.max(ctx.leafFrame.getBytesRequriedToWriteTuple(tuple),
+                ctx.interiorFrame.getBytesRequriedToWriteTuple(tuple));
+        if (tupleSize > maxTupleSize) {
+            throw new TreeIndexException("Space required for record (" + tupleSize
+                    + ") larger than maximum acceptable size (" + maxTupleSize + ")");
+        }
         ctx.modificationCallback.before(tuple);
         insertUpdateOrDelete(tuple, ctx);
     }
 
     private void upsert(ITupleReference tuple, BTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
+        int tupleSize = Math.max(ctx.leafFrame.getBytesRequriedToWriteTuple(tuple),
+                ctx.interiorFrame.getBytesRequriedToWriteTuple(tuple));
+        if (tupleSize > maxTupleSize) {
+            throw new TreeIndexException("Space required for record (" + tupleSize
+                    + ") larger than maximum acceptable size (" + maxTupleSize + ")");
+        }
         ctx.modificationCallback.before(tuple);
         insertUpdateOrDelete(tuple, ctx);
     }
@@ -320,6 +338,12 @@
         if (fieldCount == ctx.cmp.getKeyFieldCount()) {
             throw new BTreeNotUpdateableException("Cannot perform updates when the entire tuple forms the key.");
         }
+        int tupleSize = Math.max(ctx.leafFrame.getBytesRequriedToWriteTuple(tuple),
+                ctx.interiorFrame.getBytesRequriedToWriteTuple(tuple));
+        if (tupleSize > maxTupleSize) {
+            throw new TreeIndexException("Space required for record (" + tupleSize
+                    + ") larger than maximum acceptable size (" + maxTupleSize + ")");
+        }
         ctx.modificationCallback.before(tuple);
         insertUpdateOrDelete(tuple, ctx);
     }
@@ -933,6 +957,13 @@
         @Override
         public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
             try {
+                int tupleSize = Math.max(leafFrame.getBytesRequriedToWriteTuple(tuple),
+                        interiorFrame.getBytesRequriedToWriteTuple(tuple));
+                if (tupleSize > maxTupleSize) {
+                    throw new TreeIndexException("Space required for record (" + tupleSize
+                            + ") larger than maximum acceptable size (" + maxTupleSize + ")");
+                }
+
                 NodeFrontier leafFrontier = nodeFrontiers.get(0);
 
                 int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexFrame.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexFrame.java
index 612af25..4a14505 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexFrame.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexFrame.java
@@ -59,6 +59,10 @@
 
     public ByteBuffer getBuffer();
 
+    public int getMaxTupleSize(int pageSize);
+
+    public int getBytesRequriedToWriteTuple(ITupleReference tuple);
+
     // for debugging
     public String printHeader();
 
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index e46efff..22c6637 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.hyracks.storage.am.common.dataflow;
 
+import java.io.File;
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -23,6 +24,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.util.IndexFileNameUtil;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactory;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
 import edu.uci.ics.hyracks.storage.common.file.LocalResource;
@@ -37,6 +39,7 @@
     protected final ResourceIdFactory resourceIdFactory;
     protected final FileReference file;
     protected final int partition;
+    protected final int ioDeviceId;
 
     protected IIndex index;
 
@@ -47,7 +50,9 @@
         this.localResourceRepository = opDesc.getStorageManager().getLocalResourceRepository(ctx);
         this.resourceIdFactory = opDesc.getStorageManager().getResourceIdFactory(ctx);
         this.partition = partition;
-        this.file = opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile();
+        this.ioDeviceId = opDesc.getFileSplitProvider().getFileSplits()[partition].getIODeviceId();
+        this.file = new FileReference(new File(IndexFileNameUtil.prepareFileName(opDesc.getFileSplitProvider()
+                .getFileSplits()[partition].getLocalFile().getFile().getPath(), ioDeviceId)));
     }
 
     protected abstract IIndex createIndexInstance() throws HyracksDataException;
@@ -70,7 +75,7 @@
             // any physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource ID). 
             // Once the index has been created, a new resource ID can be generated.
             if (resourceID != -1) {
-                localResourceRepository.deleteResourceByName(file.getFile().getPath());
+                localResourceRepository.deleteResourceByName(file.getFile().getPath(), ioDeviceId);
             }
             index.create();
             try {
@@ -78,8 +83,9 @@
                 resourceID = resourceIdFactory.createId();
                 ILocalResourceFactory localResourceFactory = opDesc.getLocalResourceFactoryProvider()
                         .getLocalResourceFactory();
-                localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
-                        .getPath(), partition));
+                localResourceRepository.insert(
+                        localResourceFactory.createLocalResource(resourceID, file.getFile().getPath(), partition),
+                        ioDeviceId);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -121,7 +127,7 @@
             }
 
             if (resourceID != -1) {
-                localResourceRepository.deleteResourceByName(file.getFile().getPath());
+                localResourceRepository.deleteResourceByName(file.getFile().getPath(), ioDeviceId);
             }
             index.destroy();
         }
@@ -143,4 +149,4 @@
     public IHyracksTaskContext getTaskContext() {
         return ctx;
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/TreeIndexNSMFrame.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/TreeIndexNSMFrame.java
index 31ce573..2e97b21 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/TreeIndexNSMFrame.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/TreeIndexNSMFrame.java
@@ -61,6 +61,11 @@
     }
 
     @Override
+    public int getMaxTupleSize(int pageSize) {
+        return (pageSize - getPageHeaderSize()) / 2;
+    }
+
+    @Override
     public boolean isLeaf() {
         return buf.get(levelOff) == 0;
     }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java
new file mode 100644
index 0000000..fbab3cf
--- /dev/null
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.common.util;
+
+import java.io.File;
+
+public class IndexFileNameUtil {
+
+    public static String prepareFileName(String path, int ioDeviceId) {
+        return path + File.separator + "device_id_" + ioDeviceId;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index 79f48fa..eb2e760 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -64,6 +64,7 @@
                 .getStorageManager().getBufferCache(ctx), opDesc.getStorageManager().getFileMapProvider(ctx),
                 treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc.getTreeIndexComparatorFactories(), treeOpDesc
                         .getTreeIndexBloomFilterKeyFields(), bloomFilterFalsePositiveRate, mergePolicy,
-                opTrackerFactory, ioScheduler, ioOpCallbackProvider, partition);
+                opTrackerFactory, ioScheduler, ioOpCallbackProvider,
+                opDesc.getFileSplitProvider().getFileSplits()[partition].getIODeviceId());
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index bbcee06..0080c42 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
@@ -41,8 +40,8 @@
     private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
 
     public LSMBTreeFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
-            TreeIndexFactory<? extends ITreeIndex> btreeFactory, int startIODeviceIndex) {
-        super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+            TreeIndexFactory<? extends ITreeIndex> btreeFactory, int ioDeviceId) {
+        super(ioManager, fileMapProvider, file, null, ioDeviceId);
         this.btreeFactory = btreeFactory;
     }
 
@@ -80,17 +79,16 @@
         ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
         ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<ComparableFileName>();
 
-        // Gather files from all IODeviceHandles.
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            // List of valid BTree files.
-            cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, allBTreeFiles);
-            HashSet<String> btreeFilesSet = new HashSet<String>();
-            for (ComparableFileName cmpFileName : allBTreeFiles) {
-                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
-                btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
-            }
-            validateFiles(dev, btreeFilesSet, allBloomFilterFiles, bloomFilterFilter, null);
+        // Gather files from the IODeviceHandle.
+
+        // List of valid BTree files.
+        cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, allBTreeFiles);
+        HashSet<String> btreeFilesSet = new HashSet<String>();
+        for (ComparableFileName cmpFileName : allBTreeFiles) {
+            int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+            btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
         }
+        validateFiles(dev, btreeFilesSet, allBloomFilterFiles, bloomFilterFilter, null);
 
         // Sanity check.
         if (allBTreeFiles.size() != allBloomFilterFiles.size()) {
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index 9033797..d92e93d 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -45,25 +45,12 @@
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class LSMBTreeUtils {
-
     public static LSMBTree createLSMTree(IInMemoryBufferCache memBufferCache,
             IInMemoryFreePageManager memFreePageManager, IIOManager ioManager, FileReference file,
             IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
             ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
-            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
-        return createLSMTree(memBufferCache, memFreePageManager, ioManager, file, diskBufferCache, diskFileMapProvider,
-                typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicy,
-                opTrackerFactory, ioScheduler, ioOpCallbackProvider, 0);
-    }
-
-    public static LSMBTree createLSMTree(IInMemoryBufferCache memBufferCache,
-            IInMemoryFreePageManager memFreePageManager, IIOManager ioManager, FileReference file,
-            IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ITypeTraits[] typeTraits,
-            IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
-            ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
-            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
-            int startIODeviceIndex) {
+            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider, int ioDeviceId) {
         LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
                 cmpFactories.length, false);
         LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
@@ -88,7 +75,7 @@
                 bloomFilterKeyFields);
 
         ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, diskFileMapProvider, file,
-                diskBTreeFactory, startIODeviceIndex);
+                diskBTreeFactory, ioDeviceId);
 
         LSMBTree lsmTree = new LSMBTree(memBufferCache, memFreePageManager, interiorFrameFactory,
                 insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index a808143..99b62d8 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -44,10 +44,8 @@
     protected static final String SPLIT_STRING = "_";
     protected static final String BLOOM_FILTER_STRING = "f";
 
-    // Use all IODevices registered in ioManager in a round-robin fashion to choose
-    // where to flush and merge
-    protected final IIOManager ioManager;
     protected final IFileMapProvider fileMapProvider;
+    protected final IODeviceHandle dev;
 
     // baseDir should reflect dataset name and partition name.
     protected String baseDir;
@@ -57,19 +55,15 @@
 
     protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
 
-    // The current index for the round-robin file assignment
-    private int ioDeviceIndex = 0;
-
     public AbstractLSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
-            TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
+            TreeIndexFactory<? extends ITreeIndex> treeFactory, int ioDeviceId) {
         this.baseDir = file.getFile().getPath();
         if (!baseDir.endsWith(System.getProperty("file.separator"))) {
             baseDir += System.getProperty("file.separator");
         }
         this.fileMapProvider = fileMapProvider;
-        this.ioManager = ioManager;
         this.treeFactory = treeFactory;
-        ioDeviceIndex = startIODeviceIndex % ioManager.getIODevices().size();
+        this.dev = ioManager.getIODevices().get(ioDeviceId);
     }
 
     private static FilenameFilter fileNameFilter = new FilenameFilter() {
@@ -135,18 +129,14 @@
 
     @Override
     public void createDirs() {
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            File f = new File(dev.getPath(), baseDir);
-            f.mkdirs();
-        }
+        File f = new File(dev.getPath(), baseDir);
+        f.mkdirs();
     }
 
     @Override
     public void deleteDirs() {
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            File f = new File(dev.getPath(), baseDir);
-            delete(f);
-        }
+        File f = new File(dev.getPath(), baseDir);
+        delete(f);
     }
 
     private void delete(File f) {
@@ -165,9 +155,6 @@
     };
 
     protected FileReference createFlushFile(String relFlushFileName) {
-        // Assigns new files to I/O devices in round-robin fashion.
-        IODeviceHandle dev = ioManager.getIODevices().get(ioDeviceIndex);
-        ioDeviceIndex = (ioDeviceIndex + 1) % ioManager.getIODevices().size();
         return dev.createFileReference(relFlushFileName);
     }
 
@@ -198,14 +185,12 @@
         List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
         ArrayList<ComparableFileName> allFiles = new ArrayList<ComparableFileName>();
 
-        // Gather files from all IODeviceHandles and delete invalid files
+        // Gather files from the IODeviceHandle and delete invalid files
         // There are two types of invalid files:
         // (1) The isValid flag is not set
         // (2) The file's interval is contained by some other file
         // Here, we only filter out (1).
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            cleanupAndGetValidFilesInternal(dev, fileNameFilter, treeFactory, allFiles);
-        }
+        cleanupAndGetValidFilesInternal(dev, fileNameFilter, treeFactory, allFiles);
 
         if (allFiles.isEmpty()) {
             return validFiles;
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java
index c997f9d..c97e10d 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java
@@ -67,11 +67,12 @@
             IBufferCache diskBufferCache = opDesc.getStorageManager().getBufferCache(ctx);
             IFileMapProvider diskFileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
             LSMInvertedIndex invIndex = InvertedIndexUtils.createLSMInvertedIndex(memBufferCache, memFreePageManager,
-                    diskFileMapProvider, invIndexOpDesc.getInvListsTypeTraits(),
-                    invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(),
-                    invIndexOpDesc.getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(),
-                    diskBufferCache, ctx.getIOManager(), file.getFile().getPath(), bloomFilterFalsePositiveRate,
-                    mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider, partition);
+                    diskFileMapProvider, invIndexOpDesc.getInvListsTypeTraits(), invIndexOpDesc
+                            .getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(), invIndexOpDesc
+                            .getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(), diskBufferCache, ctx
+                            .getIOManager(), file.getFile().getPath(), bloomFilterFalsePositiveRate, mergePolicy,
+                    opTrackerFactory, ioScheduler, ioOpCallbackProvider,
+                    opDesc.getFileSplitProvider().getFileSplits()[partition].getIODeviceId());
             return invIndex;
         } catch (IndexException e) {
             throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
index 9427275..6c5b25f 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
@@ -67,11 +67,12 @@
             IBufferCache diskBufferCache = opDesc.getStorageManager().getBufferCache(ctx);
             IFileMapProvider diskFileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
             PartitionedLSMInvertedIndex invIndex = InvertedIndexUtils.createPartitionedLSMInvertedIndex(memBufferCache,
-                    memFreePageManager, diskFileMapProvider, invIndexOpDesc.getInvListsTypeTraits(),
-                    invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(),
-                    invIndexOpDesc.getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(),
-                    diskBufferCache, ctx.getIOManager(), file.getFile().getPath(), bloomFilterFalsePositiveRate,
-                    mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider, partition);
+                    memFreePageManager, diskFileMapProvider, invIndexOpDesc.getInvListsTypeTraits(), invIndexOpDesc
+                            .getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(), invIndexOpDesc
+                            .getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(), diskBufferCache, ctx
+                            .getIOManager(), file.getFile().getPath(), bloomFilterFalsePositiveRate, mergePolicy,
+                    opTrackerFactory, ioScheduler, ioOpCallbackProvider,
+                    opDesc.getFileSplitProvider().getFileSplits()[partition].getIODeviceId());
             return invIndex;
         } catch (IndexException e) {
             throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index ccba624..aa1dbeb 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
@@ -63,8 +62,8 @@
     };
 
     public LSMInvertedIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
-            BTreeFactory btreeFactory, int startIODeviceIndex) {
-        super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+            BTreeFactory btreeFactory, int ioDeviceId) {
+        super(ioManager, fileMapProvider, file, null, ioDeviceId);
         this.btreeFactory = btreeFactory;
     }
 
@@ -100,20 +99,19 @@
         ArrayList<ComparableFileName> allDeletedKeysBTreeFiles = new ArrayList<ComparableFileName>();
         ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<ComparableFileName>();
 
-        // Gather files from all IODeviceHandles.
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            cleanupAndGetValidFilesInternal(dev, deletedKeysBTreeFilter, btreeFactory, allDeletedKeysBTreeFiles);
-            HashSet<String> deletedKeysBTreeFilesSet = new HashSet<String>();
-            for (ComparableFileName cmpFileName : allDeletedKeysBTreeFiles) {
-                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
-                deletedKeysBTreeFilesSet.add(cmpFileName.fileName.substring(0, index));
-            }
-
-            // TODO: do we really need to validate the inverted lists files or is validating the dict. BTrees is enough?
-            validateFiles(dev, deletedKeysBTreeFilesSet, allInvListsFiles, invListFilter, null);
-            validateFiles(dev, deletedKeysBTreeFilesSet, allDictBTreeFiles, dictBTreeFilter, btreeFactory);
-            validateFiles(dev, deletedKeysBTreeFilesSet, allBloomFilterFiles, bloomFilterFilter, null);
+        // Gather files from the IODeviceHandle.
+        cleanupAndGetValidFilesInternal(dev, deletedKeysBTreeFilter, btreeFactory, allDeletedKeysBTreeFiles);
+        HashSet<String> deletedKeysBTreeFilesSet = new HashSet<String>();
+        for (ComparableFileName cmpFileName : allDeletedKeysBTreeFiles) {
+            int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+            deletedKeysBTreeFilesSet.add(cmpFileName.fileName.substring(0, index));
         }
+
+        // TODO: do we really need to validate the inverted lists files or is validating the dict. BTrees is enough?
+        validateFiles(dev, deletedKeysBTreeFilesSet, allInvListsFiles, invListFilter, null);
+        validateFiles(dev, deletedKeysBTreeFilesSet, allDictBTreeFiles, dictBTreeFilter, btreeFactory);
+        validateFiles(dev, deletedKeysBTreeFilesSet, allBloomFilterFiles, bloomFilterFilter, null);
+
         // Sanity check.
         if (allDictBTreeFiles.size() != allInvListsFiles.size()
                 || allDictBTreeFiles.size() != allDeletedKeysBTreeFiles.size()
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
index a3ff152..06be7aa 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
@@ -124,21 +124,7 @@
             IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
             String onDiskDir, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
             ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackProvider ioOpCallbackProvider) throws IndexException {
-        return createLSMInvertedIndex(memBufferCache, memFreePageManager, diskFileMapProvider, invListTypeTraits,
-                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, diskBufferCache, ioManager,
-                onDiskDir, bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory, ioScheduler,
-                ioOpCallbackProvider, 0);
-    }
-
-    public static LSMInvertedIndex createLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
-            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
-            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
-            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
-            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
-            String onDiskDir, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
-            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackProvider ioOpCallbackProvider, int startIODeviceIndex) throws IndexException {
+            ILSMIOOperationCallbackProvider ioOpCallbackProvider, int ioDeviceId) throws IndexException {
 
         BTreeFactory deletedKeysBTreeFactory = createDeletedKeysBTreeFactory(diskFileMapProvider, invListTypeTraits,
                 invListCmpFactories, diskBufferCache);
@@ -152,7 +138,7 @@
 
         FileReference onDiskDirFileRef = new FileReference(new File(onDiskDir));
         LSMInvertedIndexFileManager fileManager = new LSMInvertedIndexFileManager(ioManager, diskFileMapProvider,
-                onDiskDirFileRef, deletedKeysBTreeFactory, startIODeviceIndex);
+                onDiskDirFileRef, deletedKeysBTreeFactory, ioDeviceId);
 
         IInvertedListBuilderFactory invListBuilderFactory = new FixedSizeElementInvertedListBuilderFactory(
                 invListTypeTraits);
@@ -174,21 +160,7 @@
             IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
             String onDiskDir, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
             ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackProvider ioOpCallbackProvider) throws IndexException {
-        return createPartitionedLSMInvertedIndex(memBufferCache, memFreePageManager, diskFileMapProvider,
-                invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory,
-                diskBufferCache, ioManager, onDiskDir, bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory,
-                ioScheduler, ioOpCallbackProvider, 0);
-    }
-
-    public static PartitionedLSMInvertedIndex createPartitionedLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
-            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
-            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
-            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
-            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
-            String onDiskDir, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
-            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackProvider ioOpCallbackProvider, int startIODeviceIndex) throws IndexException {
+            ILSMIOOperationCallbackProvider ioOpCallbackProvider, int ioDeviceId) throws IndexException {
 
         BTreeFactory deletedKeysBTreeFactory = createDeletedKeysBTreeFactory(diskFileMapProvider, invListTypeTraits,
                 invListCmpFactories, diskBufferCache);
@@ -202,7 +174,7 @@
 
         FileReference onDiskDirFileRef = new FileReference(new File(onDiskDir));
         LSMInvertedIndexFileManager fileManager = new LSMInvertedIndexFileManager(ioManager, diskFileMapProvider,
-                onDiskDirFileRef, deletedKeysBTreeFactory, startIODeviceIndex);
+                onDiskDirFileRef, deletedKeysBTreeFactory, ioDeviceId);
 
         IInvertedListBuilderFactory invListBuilderFactory = new FixedSizeElementInvertedListBuilderFactory(
                 invListTypeTraits);
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
index 49e0a23..5ab5513 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
@@ -72,7 +72,8 @@
             return LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file, diskBufferCache,
                     diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories, valueProviderFactories,
                     rtreePolicyType, bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory, ioScheduler,
-                    ioOpCallbackProvider, linearizeCmpFactory, startIODeviceIndex);
+                    ioOpCallbackProvider, linearizeCmpFactory,
+                    opDesc.getFileSplitProvider().getFileSplits()[partition].getIODeviceId());
         } catch (TreeIndexException e) {
             throw new HyracksDataException(e);
         }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 851235e..fd36920 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
@@ -56,8 +55,8 @@
 
     public LSMRTreeFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> rtreeFactory, TreeIndexFactory<? extends ITreeIndex> btreeFactory,
-            int startIODeviceIndex) {
-        super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+            int ioDeviceId) {
+        super(ioManager, fileMapProvider, file, null, ioDeviceId);
         this.rtreeFactory = rtreeFactory;
         this.btreeFactory = btreeFactory;
     }
@@ -93,17 +92,16 @@
         ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
         ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<ComparableFileName>();
 
-        // Gather files from all IODeviceHandles.
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, allBTreeFiles);
-            HashSet<String> btreeFilesSet = new HashSet<String>();
-            for (ComparableFileName cmpFileName : allBTreeFiles) {
-                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
-                btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
-            }
-            validateFiles(dev, btreeFilesSet, allRTreeFiles, rtreeFilter, rtreeFactory);
-            validateFiles(dev, btreeFilesSet, allBloomFilterFiles, bloomFilterFilter, null);
+        // Gather files from the IODeviceHandle.
+        cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, allBTreeFiles);
+        HashSet<String> btreeFilesSet = new HashSet<String>();
+        for (ComparableFileName cmpFileName : allBTreeFiles) {
+            int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+            btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
         }
+        validateFiles(dev, btreeFilesSet, allRTreeFiles, rtreeFilter, rtreeFactory);
+        validateFiles(dev, btreeFilesSet, allBloomFilterFiles, bloomFilterFilter, null);
+
         // Sanity check.
         if (allRTreeFiles.size() != allBTreeFiles.size() || allBTreeFiles.size() != allBloomFilterFiles.size()) {
             throw new HyracksDataException(
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
index 6ddf766..872d152 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
@@ -25,7 +25,7 @@
 public class LSMRTreeWithAntiMatterTuplesFileManager extends AbstractLSMIndexFileManager {
 
     public LSMRTreeWithAntiMatterTuplesFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider,
-            FileReference file, TreeIndexFactory<? extends ITreeIndex> rtreeFactory, int startIODeviceIndex) {
-        super(ioManager, fileMapProvider, file, rtreeFactory, startIODeviceIndex);
+            FileReference file, TreeIndexFactory<? extends ITreeIndex> rtreeFactory, int ioDeviceId) {
+        super(ioManager, fileMapProvider, file, rtreeFactory, ioDeviceId);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index d6efc34..d299bfe 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -67,23 +67,8 @@
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
             ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackProvider ioOpCallbackProvider, ILinearizeComparatorFactory linearizeCmpFactory)
-            throws TreeIndexException {
-        return createLSMTree(memBufferCache, memFreePageManager, ioManager, file, diskBufferCache, diskFileMapProvider,
-                typeTraits, rtreeCmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
-                bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider,
-                linearizeCmpFactory, 0);
-    }
-
-    public static LSMRTree createLSMTree(IInMemoryBufferCache memBufferCache,
-            IInMemoryFreePageManager memFreePageManager, IIOManager ioManager, FileReference file,
-            IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ITypeTraits[] typeTraits,
-            IBinaryComparatorFactory[] rtreeCmpFactories, IBinaryComparatorFactory[] btreeCmpFactories,
-            IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
-            double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
-            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallbackProvider ioOpCallbackProvider, ILinearizeComparatorFactory linearizeCmpFactory,
-            int startIODeviceIndex) throws TreeIndexException {
+            int ioDeviceId) throws TreeIndexException {
         LSMTypeAwareTupleWriterFactory rtreeTupleWriterFactory = new LSMTypeAwareTupleWriterFactory(typeTraits, false);
         LSMTypeAwareTupleWriterFactory btreeTupleWriterFactory = new LSMTypeAwareTupleWriterFactory(typeTraits, true);
 
@@ -117,7 +102,7 @@
                 bloomFilterKeyFields);
 
         ILSMIndexFileManager fileNameManager = new LSMRTreeFileManager(ioManager, diskFileMapProvider, file,
-                diskRTreeFactory, diskBTreeFactory, startIODeviceIndex);
+                diskRTreeFactory, diskBTreeFactory, ioDeviceId);
         LSMRTree lsmTree = new LSMRTree(memBufferCache, memFreePageManager, rtreeInteriorFrameFactory,
                 rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, fileNameManager,
                 diskRTreeFactory, diskBTreeFactory, bloomFilterFactory, bloomFilterFalsePositiveRate,
@@ -133,21 +118,7 @@
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
             ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
-            ILinearizeComparatorFactory linearizerCmpFactory) throws TreeIndexException {
-        return createLSMTreeWithAntiMatterTuples(memBufferCache, memFreePageManager, ioManager, file, diskBufferCache,
-                diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories, valueProviderFactories,
-                rtreePolicyType, mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider,
-                linearizerCmpFactory, 0);
-    }
-
-    public static LSMRTreeWithAntiMatterTuples createLSMTreeWithAntiMatterTuples(IInMemoryBufferCache memBufferCache,
-            IInMemoryFreePageManager memFreePageManager, IIOManager ioManager, FileReference file,
-            IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ITypeTraits[] typeTraits,
-            IBinaryComparatorFactory[] rtreeCmpFactories, IBinaryComparatorFactory[] btreeCmpFactories,
-            IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
-            ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
-            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
-            ILinearizeComparatorFactory linearizerCmpFactory, int startIODeviceIndex) throws TreeIndexException {
+            ILinearizeComparatorFactory linearizerCmpFactory, int ioDeviceId) throws TreeIndexException {
 
         LSMRTreeTupleWriterFactory rtreeTupleWriterFactory = new LSMRTreeTupleWriterFactory(typeTraits, false);
         LSMRTreeTupleWriterFactory btreeTupleWriterFactory = new LSMRTreeTupleWriterFactory(typeTraits, true);
@@ -184,7 +155,7 @@
                 btreeCmpFactories[btreeCmpFactories.length - 1] };
 
         ILSMIndexFileManager fileNameManager = new LSMRTreeWithAntiMatterTuplesFileManager(ioManager,
-                diskFileMapProvider, file, diskRTreeFactory, startIODeviceIndex);
+                diskFileMapProvider, file, diskRTreeFactory, ioDeviceId);
         LSMRTreeWithAntiMatterTuples lsmTree = new LSMRTreeWithAntiMatterTuples(memBufferCache, memFreePageManager,
                 rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory,
                 fileNameManager, diskRTreeFactory, bulkLoadRTreeFactory, diskFileMapProvider, typeTraits.length,
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
index 5ab9632..f197853 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
@@ -48,6 +48,11 @@
     }
 
     @Override
+    public int getBytesRequriedToWriteTuple(ITupleReference tuple) {
+        return tupleWriter.bytesRequired(tuple) + childPtrSize + slotManager.getSlotSize();
+    }
+
+    @Override
     public int findBestChild(ITupleReference tuple, MultiComparator cmp) {
         int bestChild = rtreePolicy.findBestChildPosition(this, tuple, frameTuple, cmp);
         frameTuple.resetByTupleIndex(this, bestChild);
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
index d52ef16..4057778 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
@@ -30,6 +30,11 @@
     }
 
     @Override
+    public int getBytesRequriedToWriteTuple(ITupleReference tuple) {
+        return tupleWriter.bytesRequired(tuple) + slotManager.getSlotSize();
+    }
+
+    @Override
     public ITreeIndexTupleReference createTupleReference() {
         return tupleWriter.createTupleReference();
     }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index c12dc50..48fb6f1 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -61,12 +61,18 @@
     // Global node sequence number used for the concurrency control protocol
     private final AtomicLong globalNsn;
 
+    private final int maxTupleSize;
+
     public RTree(IBufferCache bufferCache, IFileMapProvider fileMapProvider, IFreePageManager freePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory,
             IBinaryComparatorFactory[] cmpFactories, int fieldCount, FileReference file) {
         super(bufferCache, fileMapProvider, freePageManager, interiorFrameFactory, leafFrameFactory, cmpFactories,
                 fieldCount, file);
         globalNsn = new AtomicLong();
+        ITreeIndexFrame leafFrame = leafFrameFactory.createFrame();
+        ITreeIndexFrame interiorFrame = interiorFrameFactory.createFrame();
+        maxTupleSize = Math.min(leafFrame.getMaxTupleSize(bufferCache.getPageSize()),
+                interiorFrame.getMaxTupleSize(bufferCache.getPageSize()));
     }
 
     private long incrementGlobalNsn() {
@@ -147,6 +153,12 @@
     private void insert(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
             TreeIndexException {
         RTreeOpContext ctx = (RTreeOpContext) ictx;
+        int tupleSize = Math.max(ctx.leafFrame.getBytesRequriedToWriteTuple(tuple),
+                ctx.interiorFrame.getBytesRequriedToWriteTuple(tuple));
+        if (tupleSize > maxTupleSize) {
+            throw new TreeIndexException("Record size (" + tupleSize + ") larger than maximum acceptable record size ("
+                    + maxTupleSize + ")");
+        }
         ctx.reset();
         ctx.setTuple(tuple);
         ctx.splitKey.reset();
@@ -614,7 +626,7 @@
         ctx.splitKey.reset();
         ctx.splitKey.getLeftTuple().setFieldCount(cmpFactories.length);
 
-        // We delete the first matching tuple (including the payload data.
+        // We delete the first matching tuple (including the payload data).
         // We don't update the MBRs of the parents after deleting the record.
         int tupleIndex = findTupleToDelete(ctx);
 
@@ -870,8 +882,15 @@
         }
 
         @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
+        public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
             try {
+                int tupleSize = Math.max(leafFrame.getBytesRequriedToWriteTuple(tuple),
+                        interiorFrame.getBytesRequriedToWriteTuple(tuple));
+                if (tupleSize > maxTupleSize) {
+                    throw new TreeIndexException("Space required for record (" + tupleSize
+                            + ") larger than maximum acceptable size (" + maxTupleSize + ")");
+                }
+
                 NodeFrontier leafFrontier = nodeFrontiers.get(0);
 
                 int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/ILocalResourceRepository.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/ILocalResourceRepository.java
index ab9ec41..36aa088 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/ILocalResourceRepository.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/ILocalResourceRepository.java
@@ -19,16 +19,16 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface ILocalResourceRepository {
-	
+
     public LocalResource getResourceById(long id) throws HyracksDataException;
 
     public LocalResource getResourceByName(String name) throws HyracksDataException;
 
-    public void insert(LocalResource resource) throws HyracksDataException;
+    public void insert(LocalResource resource, int ioDeviceId) throws HyracksDataException;
 
-    public void deleteResourceById(long id) throws HyracksDataException;
+    public void deleteResourceById(long id, int ioDeviceId) throws HyracksDataException;
 
-    public void deleteResourceByName(String name) throws HyracksDataException;
+    public void deleteResourceByName(String name, int ioDeviceId) throws HyracksDataException;
 
     public List<LocalResource> getAllResources() throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientLocalResourceRepository.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientLocalResourceRepository.java
index 55bd807..f853a36 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientLocalResourceRepository.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientLocalResourceRepository.java
@@ -37,7 +37,7 @@
     }
 
     @Override
-    public synchronized void insert(LocalResource resource) throws HyracksDataException {
+    public synchronized void insert(LocalResource resource, int ioDeviceId) throws HyracksDataException {
         long id = resource.getResourceId();
 
         if (id2ResourceMap.containsKey(id)) {
@@ -48,7 +48,7 @@
     }
 
     @Override
-    public synchronized void deleteResourceById(long id) throws HyracksDataException {
+    public synchronized void deleteResourceById(long id, int ioDeviceId) throws HyracksDataException {
         LocalResource resource = id2ResourceMap.get(id);
         if (resource == null) {
             throw new HyracksDataException("Resource doesn't exist");
@@ -58,7 +58,7 @@
     }
 
     @Override
-    public synchronized void deleteResourceByName(String name) throws HyracksDataException {
+    public synchronized void deleteResourceByName(String name, int ioDeviceId) throws HyracksDataException {
         LocalResource resource = name2ResourceMap.get(name);
         if (resource == null) {
             throw new HyracksDataException("Resource doesn't exist");
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
index f93e9b6..fdb4341 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
@@ -225,7 +225,10 @@
         int p1y = rnd.nextInt();
         int p2x = rnd.nextInt();
         int p2y = rnd.nextInt();
-        String data = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
+        String data = "";
+        for (int i = 0; i < 210; i++) {
+            data += "X";
+        }
         TupleUtils.createTuple(tb, tuple, fieldSerdes, Math.min(p1x, p2x), Math.min(p1y, p2y), Math.max(p1x, p2x),
                 Math.max(p1y, p2y), data);
         indexAccessor.insert(tuple);
@@ -261,7 +264,10 @@
         p1y = rnd.nextInt();
         p2x = rnd.nextInt();
         p2y = rnd.nextInt();
-        data = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
+        data = "";
+        for (int i = 0; i < 210; i++) {
+            data += "X";
+        }
         TupleUtils.createTuple(tb, tuple, fieldSerdes, Math.min(p1x, p2x), Math.min(p1y, p2y), Math.max(p1x, p2x),
                 Math.max(p1y, p2y), data);
         indexAccessor.insert(tuple);
@@ -270,7 +276,10 @@
         p1y = rnd.nextInt();
         p2x = rnd.nextInt();
         p2y = rnd.nextInt();
-        data = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
+        data = "";
+        for (int i = 0; i < 210; i++) {
+            data += "X";
+        }
         TupleUtils.createTuple(tb, tuple, fieldSerdes, Math.min(p1x, p2x), Math.min(p1y, p2y), Math.max(p1x, p2x),
                 Math.max(p1y, p2y), data);
         indexAccessor.insert(tuple);
@@ -340,7 +349,10 @@
         int p1y = rnd.nextInt();
         int p2x = rnd.nextInt();
         int p2y = rnd.nextInt();
-        String data = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
+        String data = "";
+        for (int i = 0; i < 210; i++) {
+            data += "X";
+        }
         TupleUtils.createTuple(tb, tuple, fieldSerdes, Math.min(p1x, p2x), Math.min(p1y, p2y), Math.max(p1x, p2x),
                 Math.max(p1y, p2y), data);
         indexAccessor.insert(tuple);
@@ -349,7 +361,10 @@
         p1y = rnd.nextInt();
         p2x = rnd.nextInt();
         p2y = rnd.nextInt();
-        data = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
+        data = "";
+        for (int i = 0; i < 210; i++) {
+            data += "X";
+        }
         TupleUtils.createTuple(tb, tuple, fieldSerdes, Math.min(p1x, p2x), Math.min(p1y, p2y), Math.max(p1x, p2x),
                 Math.max(p1y, p2y), data);
         indexAccessor.insert(tuple);
@@ -376,7 +391,10 @@
         p1y = rnd.nextInt();
         p2x = rnd.nextInt();
         p2y = rnd.nextInt();
-        data = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
+        data = "";
+        for (int i = 0; i < 210; i++) {
+            data += "X";
+        }
         TupleUtils.createTuple(tb, tuple, fieldSerdes, Math.min(p1x, p2x), Math.min(p1y, p2y), Math.max(p1x, p2x),
                 Math.max(p1y, p2y), data);
         indexAccessor.insert(tuple);
@@ -385,7 +403,10 @@
         p1y = rnd.nextInt();
         p2x = rnd.nextInt();
         p2y = rnd.nextInt();
-        data = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
+        data = "";
+        for (int i = 0; i < 210; i++) {
+            data += "X";
+        }
         TupleUtils.createTuple(tb, tuple, fieldSerdes, Math.min(p1x, p2x), Math.min(p1y, p2y), Math.max(p1x, p2x),
                 Math.max(p1y, p2y), data);
         indexAccessor.insert(tuple);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java
index 843d200..a7eee57 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java
@@ -55,7 +55,7 @@
                 harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getBoomFilterFalsePositiveRate(),harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java
index eb97c56..a648ff6 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java
@@ -55,7 +55,7 @@
                 harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getBoomFilterFalsePositiveRate(),
                 harness.getMergePolicy(), harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
index 76972a5..b5b0b76 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
@@ -39,7 +39,7 @@
                 harness.getDiskFileMapProvider(), typeTraits, cmpFactories, bloomFilterKeyFields,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Before
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java
index d070f23..875bbd4 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java
@@ -55,7 +55,7 @@
                 harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getBoomFilterFalsePositiveRate(),
                 harness.getMergePolicy(), harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeLifecycleTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeLifecycleTest.java
index 0455c20..43d38bc 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeLifecycleTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeLifecycleTest.java
@@ -47,7 +47,7 @@
                 harness.getDiskFileMapProvider(), fieldSerdes, fieldSerdes.length,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
         index = testCtx.getIndex();
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java
index 76fe0b8..0ea2af2 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java
@@ -54,7 +54,7 @@
                 harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getBoomFilterFalsePositiveRate(),
                 harness.getMergePolicy(), harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
index 8464e9e..901a31d 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -48,7 +48,7 @@
                 SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 NoOpOperationTrackerFactory.INSTANCE, harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java
index 3c5060b..ac8290e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java
@@ -56,7 +56,7 @@
                 harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getBoomFilterFalsePositiveRate(),
                 harness.getMergePolicy(), harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index 431f68b..15adc29 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -41,7 +41,7 @@
                 SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 NoOpOperationTrackerFactory.INSTANCE, harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java
index b19e21e..cc2db11 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java
@@ -55,7 +55,7 @@
                 harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getBoomFilterFalsePositiveRate(),
                 harness.getMergePolicy(), harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
index de9fe24..d56ff5b 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
@@ -55,7 +55,7 @@
                 harness.getDiskFileMapProvider(), typeTraits, cmpFactories, bloomFilterKeyFields,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 5a9726b..7e54003 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -57,6 +57,7 @@
 
     protected IHyracksTaskContext ctx;
     protected IOManager ioManager;
+    protected int ioDeviceId;
     protected IBufferCache bufferCache;
     protected int lsmtreeFileId;
 
@@ -88,6 +89,7 @@
         TestStorageManagerComponentHolder.init(this.onDiskPageSize, this.onDiskNumPages, MAX_OPEN_FILES);
         bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
         ioManager = TestStorageManagerComponentHolder.getIOManager();
+        ioDeviceId = 0;
         IFileMapProvider fmp = TestStorageManagerComponentHolder.getFileMapProvider(ctx);
 
         IInMemoryBufferCache memBufferCache = new InMemoryBufferCache(new HeapBufferAllocator(), inMemPageSize,
@@ -97,7 +99,8 @@
         this.ioScheduler = SynchronousScheduler.INSTANCE;
         lsmtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file, bufferCache, fmp,
                 typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, NoMergePolicy.INSTANCE,
-                ThreadCountingOperationTrackerFactory.INSTANCE, ioScheduler, NoOpIOOperationCallback.INSTANCE);
+                ThreadCountingOperationTrackerFactory.INSTANCE, ioScheduler, NoOpIOOperationCallback.INSTANCE,
+                ioDeviceId);
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
index f0380e9..45cf417 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
@@ -69,7 +69,7 @@
             IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ISerializerDeserializer[] fieldSerdes,
             int numKeyFields, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
             ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackProvider ioOpCallbackProvider) throws Exception {
+            ILSMIOOperationCallbackProvider ioOpCallbackProvider, int ioDeviceId) throws Exception {
         ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
         IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, numKeyFields);
         int[] bloomFilterKeyFields = new int[numKeyFields];
@@ -78,7 +78,8 @@
         }
         LSMBTree lsmTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file,
                 diskBufferCache, diskFileMapProvider, typeTraits, cmpFactories, bloomFilterKeyFields,
-                bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider);
+                bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider,
+                ioDeviceId);
         LSMBTreeTestContext testCtx = new LSMBTreeTestContext(fieldSerdes, lsmTree);
         return testCtx;
     }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index 231c8ff..066f57b 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -66,6 +66,7 @@
     protected final double bloomFilterFalsePositiveRate;
 
     protected IOManager ioManager;
+    protected int ioDeviceId;
     protected IBufferCache diskBufferCache;
     protected IFileMapProvider diskFileMapProvider;
     protected IInMemoryBufferCache memBufferCache;
@@ -121,6 +122,7 @@
                 new TransientFileMapManager());
         memFreePageManager = new InMemoryFreePageManager(memNumPages, new LIFOMetaDataFrameFactory());
         ioManager = TestStorageManagerComponentHolder.getIOManager();
+        ioDeviceId = 0;
         rnd.setSeed(RANDOM_SEED);
     }
 
@@ -172,6 +174,10 @@
         return ioManager;
     }
 
+    public int getIODeviceId() {
+        return ioDeviceId;
+    }
+
     public IBufferCache getDiskBufferCache() {
         return diskBufferCache;
     }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyLSMIndexFileManager.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyLSMIndexFileManager.java
index 69e23bc..d41ddd4 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyLSMIndexFileManager.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyLSMIndexFileManager.java
@@ -32,8 +32,8 @@
 public class DummyLSMIndexFileManager extends AbstractLSMIndexFileManager {
 
     public DummyLSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
-            TreeIndexFactory<? extends ITreeIndex> treeFactory) {
-        super(ioManager, fileMapProvider, file, treeFactory, 0);
+            TreeIndexFactory<? extends ITreeIndex> treeFactory, int ioDeviceId) {
+        super(ioManager, fileMapProvider, file, treeFactory, ioDeviceId);
     }
 
     protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
index 161f4ce..0c9e658 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
@@ -48,6 +48,7 @@
     private static final int DEFAULT_PAGE_SIZE = 256;
     private static final int DEFAULT_NUM_PAGES = 100;
     private static final int DEFAULT_MAX_OPEN_FILES = 10;
+    private static final int DEFAULT_IO_DEVICE_ID = 0;
     protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
     protected final static String sep = System.getProperty("file.separator");
     protected IOManager ioManager;
@@ -74,7 +75,7 @@
 
     public void sortOrderTest(boolean testFlushFileName) throws InterruptedException, HyracksDataException {
         ILSMIndexFileManager fileManager = new DummyLSMIndexFileManager(ioManager, fileMapProvider, file,
-                new DummyTreeFactory());
+                new DummyTreeFactory(), DEFAULT_IO_DEVICE_ID);
         LinkedList<String> fileNames = new LinkedList<String>();
 
         int numFileNames = 100;
@@ -115,7 +116,7 @@
 
     public void cleanInvalidFilesTest(IOManager ioManager) throws InterruptedException, IOException, IndexException {
         ILSMIndexFileManager fileManager = new DummyLSMIndexFileManager(ioManager, fileMapProvider, file,
-                new DummyTreeFactory());
+                new DummyTreeFactory(), DEFAULT_IO_DEVICE_ID);
         fileManager.createDirs();
 
         List<FileReference> flushFiles = new ArrayList<FileReference>();
@@ -187,20 +188,18 @@
                     .getFile().getName());
         }
 
-        // Make sure invalid files were removed from all IODevices.
+        // Make sure invalid files were removed from the IODevices.
         ArrayList<String> remainingFiles = new ArrayList<String>();
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            File dir = new File(dev.getPath(), baseDir);
-            FilenameFilter filter = new FilenameFilter() {
-                public boolean accept(File dir, String name) {
-                    return !name.startsWith(".");
-                }
-            };
-            String[] files = dir.list(filter);
-            for (String file : files) {
-                File f = new File(file);
-                remainingFiles.add(f.getName());
+        File dir = new File(ioManager.getIODevices().get(DEFAULT_IO_DEVICE_ID).getPath(), baseDir);
+        FilenameFilter filter = new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return !name.startsWith(".");
             }
+        };
+        String[] files = dir.list(filter);
+        for (String file : files) {
+            File f = new File(file);
+            remainingFiles.add(f.getName());
         }
 
         Collections.sort(remainingFiles, fileManager.getFileNameComparator());
@@ -218,34 +217,19 @@
         cleanDirs(singleDeviceIOManager);
     }
 
-    @Test
-    public void twoIODevicesTest() throws InterruptedException, IOException, IndexException {
-        IOManager twoDevicesIOManager = createIOManager(2);
-        cleanInvalidFilesTest(twoDevicesIOManager);
-        cleanDirs(twoDevicesIOManager);
-    }
-
-    @Test
-    public void fourIODevicesTest() throws InterruptedException, IOException, IndexException {
-        IOManager fourDevicesIOManager = createIOManager(4);
-        cleanInvalidFilesTest(fourDevicesIOManager);
-        cleanDirs(fourDevicesIOManager);
-    }
-
     private void cleanDirs(IOManager ioManager) {
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
-            File dir = new File(dev.getPath(), baseDir);
-            FilenameFilter filter = new FilenameFilter() {
-                public boolean accept(File dir, String name) {
-                    return !name.startsWith(".");
-                }
-            };
-            String[] files = dir.list(filter);
-            for (String file : files) {
-                File f = new File(file);
-                f.delete();
+        File dir = new File(ioManager.getIODevices().get(DEFAULT_IO_DEVICE_ID).getPath(), baseDir);
+        FilenameFilter filter = new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return !name.startsWith(".");
             }
+        };
+        String[] files = dir.list(filter);
+        for (String file : files) {
+            File f = new File(file);
+            f.delete();
         }
+
     }
 
     private IOManager createIOManager(int numDevices) throws HyracksException {
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
index 9c617a1..bf2d5e8 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
@@ -60,6 +60,7 @@
     protected final double bloomFilterFalsePositiveRate;
 
     protected IOManager ioManager;
+    protected int ioDeviceId;
     protected IBufferCache diskBufferCache;
     protected IFileMapProvider diskFileMapProvider;
     protected IInMemoryBufferCache memBufferCache;
@@ -116,6 +117,7 @@
         memBufferCache.open();
         memFreePageManager = new DualIndexInMemoryFreePageManager(memNumPages, new LIFOMetaDataFrameFactory());
         ioManager = TestStorageManagerComponentHolder.getIOManager();
+        ioDeviceId = 0;
         rnd.setSeed(RANDOM_SEED);
         invIndexFileRef = ioManager.getIODevices().get(0).createFileReference(onDiskDir + invIndexFileName);
     }
@@ -173,6 +175,10 @@
         return ioManager;
     }
 
+    public int getIODeviceId() {
+        return ioDeviceId;
+    }
+
     public IBufferCache getDiskBufferCache() {
         return diskBufferCache;
     }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
index a88a88c..d560ffc 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
@@ -148,7 +148,7 @@
                         harness.getDiskBufferCache(), harness.getIOManager(), harness.getOnDiskDir(),
                         harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                         harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                        harness.getIOOperationCallbackProvider());
+                        harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
                 break;
             }
             case PARTITIONED_LSM: {
@@ -158,7 +158,7 @@
                         harness.getDiskBufferCache(), harness.getIOManager(), harness.getOnDiskDir(),
                         harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                         harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                        harness.getIOOperationCallbackProvider());
+                        harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
                 break;
             }
             default: {
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java
index 4dffcdd..edd24b4 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java
@@ -59,7 +59,7 @@
                 harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java
index 039c1e5..360c02c 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java
@@ -59,7 +59,7 @@
                 harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
index 8ebb0e4..0c68fa6 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
@@ -43,7 +43,7 @@
                 valueProviderFactories, rtreePolicyType, harness.getBoomFilterFalsePositiveRate(),
                 harness.getMergePolicy(), harness.getOperationTrackerFactory(), harness.getIOScheduler(),
                 harness.getIOOperationCallbackProvider(),
-                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length));
+                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length), harness.getIODeviceId());
     }
 
     @Before
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java
index 0cceece..f39b139 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java
@@ -59,7 +59,7 @@
                 harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeLifecycleTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeLifecycleTest.java
index 18ccefb..8bee460 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeLifecycleTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeLifecycleTest.java
@@ -57,7 +57,7 @@
                 harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, RTreePolicyType.RTREE,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
         index = testCtx.getIndex();
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
index a2088d1..5d7d31b 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
@@ -58,7 +58,7 @@
                 harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java
index edcc69e..8ef0a9f 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java
@@ -59,7 +59,7 @@
                 harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
-                harness.getIOOperationCallbackProvider());
+                harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java
index 81a952d..f778aa9 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java
@@ -58,7 +58,7 @@
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getFileReference(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
                 numKeys, rtreePolicyType, harness.getMergePolicy(), harness.getOperationTrackerFactory(),
-                harness.getIOScheduler(), harness.getIOOperationCallbackProvider());
+                harness.getIOScheduler(), harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
 
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java
index 1ee92d9..44d3d1b 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java
@@ -58,7 +58,7 @@
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getFileReference(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
                 numKeys, rtreePolicyType, harness.getMergePolicy(), harness.getOperationTrackerFactory(),
-                harness.getIOScheduler(), harness.getIOOperationCallbackProvider());
+                harness.getIOScheduler(), harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
 
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java
index 3a2537c..8b12224 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java
@@ -43,7 +43,7 @@
                 btreeCmpFactories, valueProviderFactories, rtreePolicyType, harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
                 harness.getIOOperationCallbackProvider(),
-                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length));
+                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length), harness.getIODeviceId());
     }
 
     @Before
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java
index 61d5ce7..96ef868 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java
@@ -58,7 +58,7 @@
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getFileReference(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
                 numKeys, rtreePolicyType, harness.getMergePolicy(), harness.getOperationTrackerFactory(),
-                harness.getIOScheduler(), harness.getIOOperationCallbackProvider());
+                harness.getIOScheduler(), harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
 
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesLifecycleTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesLifecycleTest.java
index aee8670..012559d 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesLifecycleTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesLifecycleTest.java
@@ -56,7 +56,7 @@
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getFileReference(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
                 numKeys, RTreePolicyType.RTREE, harness.getMergePolicy(), harness.getOperationTrackerFactory(),
-                harness.getIOScheduler(), harness.getIOOperationCallbackProvider());
+                harness.getIOScheduler(), harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
         index = testCtx.getIndex();
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java
index d5fecbf..1c67cd0 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java
@@ -57,7 +57,7 @@
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getFileReference(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
                 numKeys, rtreePolicyType, harness.getMergePolicy(), harness.getOperationTrackerFactory(),
-                harness.getIOScheduler(), harness.getIOOperationCallbackProvider());
+                harness.getIOScheduler(), harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java
index de5f065..fe61afd 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java
@@ -58,7 +58,7 @@
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getFileReference(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
                 numKeys, rtreePolicyType, harness.getMergePolicy(), harness.getOperationTrackerFactory(),
-                harness.getIOScheduler(), harness.getIOOperationCallbackProvider());
+                harness.getIOScheduler(), harness.getIOOperationCallbackProvider(), harness.getIODeviceId());
 
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
index fc8fd3e..1d2271c 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
@@ -63,7 +63,7 @@
                 valueProviderFactories, rtreePolicyType, harness.getBoomFilterFalsePositiveRate(),
                 harness.getMergePolicy(), harness.getOperationTrackerFactory(), harness.getIOScheduler(),
                 harness.getIOOperationCallbackProvider(),
-                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length));
+                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length), harness.getIODeviceId());
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java
index af73676..aa5023d 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java
@@ -63,7 +63,7 @@
                 btreeCmpFactories, valueProviderFactories, rtreePolicyType, harness.getMergePolicy(),
                 harness.getOperationTrackerFactory(), harness.getIOScheduler(),
                 harness.getIOOperationCallbackProvider(),
-                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length));
+                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length), harness.getIODeviceId());
 
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
index b341752..cd2b31f 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
@@ -74,7 +74,7 @@
             IPrimitiveValueProviderFactory[] valueProviderFactories, int numKeyFields, RTreePolicyType rtreePolicyType,
             double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
             ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackProvider ioOpCallbackProvider) throws Exception {
+            ILSMIOOperationCallbackProvider ioOpCallbackProvider, int ioDeviceId) throws Exception {
         ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
         IBinaryComparatorFactory[] rtreeCmpFactories = SerdeUtils
                 .serdesToComparatorFactories(fieldSerdes, numKeyFields);
@@ -84,7 +84,7 @@
                 diskBufferCache, diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories,
                 valueProviderFactories, rtreePolicyType, bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory,
                 ioScheduler, ioOpCallbackProvider,
-                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length));
+                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length), ioDeviceId);
         LSMRTreeTestContext testCtx = new LSMRTreeTestContext(fieldSerdes, lsmTree);
         return testCtx;
     }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
index 2270526..4c2e83b 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
@@ -62,6 +62,7 @@
     protected final double bloomFilterFalsePositiveRate;
 
     protected IOManager ioManager;
+    protected int ioDeviceId;
     protected IBufferCache diskBufferCache;
     protected IFileMapProvider diskFileMapProvider;
     protected IInMemoryBufferCache memBufferCache;
@@ -116,6 +117,7 @@
         memBufferCache = new DualIndexInMemoryBufferCache(new HeapBufferAllocator(), memPageSize, memNumPages);
         memFreePageManager = new DualIndexInMemoryFreePageManager(memNumPages, new LIFOMetaDataFrameFactory());
         ioManager = TestStorageManagerComponentHolder.getIOManager();
+        ioDeviceId = 0;
         rnd.setSeed(RANDOM_SEED);
     }
 
@@ -167,6 +169,10 @@
         return ioManager;
     }
 
+    public int getIODeviceId() {
+        return ioDeviceId;
+    }
+
     public IBufferCache getDiskBufferCache() {
         return diskBufferCache;
     }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java
index 9ddad45..1740ff0 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java
@@ -74,7 +74,7 @@
             IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ISerializerDeserializer[] fieldSerdes,
             IPrimitiveValueProviderFactory[] valueProviderFactories, int numKeyFields, RTreePolicyType rtreePolicyType,
             ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
-            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider)
+            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider, int ioDeviceId)
             throws Exception {
         ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
         IBinaryComparatorFactory[] rtreeCmpFactories = SerdeUtils
@@ -85,7 +85,7 @@
                 memFreePageManager, ioManager, file, diskBufferCache, diskFileMapProvider, typeTraits,
                 rtreeCmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType, mergePolicy,
                 opTrackerFactory, ioScheduler, ioOpCallbackProvider,
-                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length));
+                LSMRTreeUtils.proposeBestLinearizer(typeTraits, rtreeCmpFactories.length), ioDeviceId);
         LSMRTreeWithAntiMatterTuplesTestContext testCtx = new LSMRTreeWithAntiMatterTuplesTestContext(fieldSerdes,
                 lsmTree);
         return testCtx;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index b9d1d74..9d23ca8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -230,10 +230,11 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -453,10 +454,11 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index bc5ab14..250d245 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -213,10 +213,11 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -414,9 +415,10 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index a47101d..f796dd7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -205,10 +205,11 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -391,10 +392,11 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 96cfaba..cb6c215 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -219,10 +219,11 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -424,10 +425,11 @@
         /**
          * add the delete operator to delete vertexes
          */
+        int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */