Merge recent set of changes from fullstack_hyracks_result_distribution to fullstack_asterix_stabilization.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3027 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
index c18d554..530d19c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -44,7 +44,7 @@
 
     @SuppressWarnings("rawtypes")
     public static RecordDescriptor mkRecordDescriptor(IVariableTypeEnvironment env, IOperatorSchema opSchema,
-            JobGenContext context) throws AlgebricksException {        
+            JobGenContext context) throws AlgebricksException {
         ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
         ITypeTraits[] typeTraits = new ITypeTraits[opSchema.getSize()];
         ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
diff --git a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index 6948769..fe24bc9 100644
--- a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -32,105 +32,125 @@
 @SuppressWarnings("rawtypes")

 public class HiveMetaDataProvider<S, T> implements IMetadataProvider<S, T> {

 

-    private Operator fileSink;

-    private Schema outputSchema;

-    private HashMap<S, IDataSource<S>> dataSourceMap;

+	private Operator fileSink;

+	private Schema outputSchema;

+	private HashMap<S, IDataSource<S>> dataSourceMap;

 

-    public HiveMetaDataProvider(Operator fsOp, Schema oi, HashMap<S, IDataSource<S>> map) {

-        fileSink = fsOp;

-        outputSchema = oi;

-        dataSourceMap = map;

-    }

+	public HiveMetaDataProvider(Operator fsOp, Schema oi,

+			HashMap<S, IDataSource<S>> map) {

+		fileSink = fsOp;

+		outputSchema = oi;

+		dataSourceMap = map;

+	}

 

-    @Override

-    public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId) throws AlgebricksException {

-        return null;

-    }

+	@Override

+	public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId)

+			throws AlgebricksException {

+		return null;

+	}

 

-    @Override

-    public IDataSource<S> findDataSource(S id) throws AlgebricksException {

-        return dataSourceMap.get(id);

-    }

+	@Override

+	public IDataSource<S> findDataSource(S id) throws AlgebricksException {

+		return dataSourceMap.get(id);

+	}

 

-    @Override

-    public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {

-        return true;

-    }

+	@Override

+	public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {

+		return true;

+	}

 

-    @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,

-            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,

-            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)

-            throws AlgebricksException {

+	@Override

+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(

+			IDataSource<S> dataSource, List<LogicalVariable> scanVariables,

+			List<LogicalVariable> projectVariables, boolean projectPushed,

+			IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,

+			JobGenContext context, JobSpecification jobSpec)

+			throws AlgebricksException {

 

-        S desc = dataSource.getId();

-        HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator((PartitionDesc) desc);

-        return generator.getRuntimeOperatorAndConstraint(dataSource, scanVariables, projectVariables, projectPushed,

-                context, jobSpec);

-    }

+		S desc = dataSource.getId();

+		HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator(

+				(PartitionDesc) desc);

+		return generator.getRuntimeOperatorAndConstraint(dataSource,

+				scanVariables, projectVariables, projectPushed, context,

+				jobSpec);

+	}

 

-    @Override

-    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,

-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {

+	@Override

+	public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(

+			IDataSink sink, int[] printColumns,

+			IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {

 

-        HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator((FileSinkOperator) fileSink, outputSchema);

-        return generator.getWriterRuntime(inputDesc);

-    }

+		HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator(

+				(FileSinkOperator) fileSink, outputSchema);

+		return generator.getWriterRuntime(inputDesc);

+	}

 

-    @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> arg0,

-            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,

-            JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {

-        // TODO Auto-generated method stub

-        return null;

-    }

+	@Override

+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(

+			IDataSource<S> arg0, IOperatorSchema arg1,

+			List<LogicalVariable> arg2, LogicalVariable arg3,

+			RecordDescriptor arg4, JobGenContext arg5, JobSpecification arg6)

+			throws AlgebricksException {

+		// TODO Auto-generated method stub

+		return null;

+	}

 

-    @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> arg0,

-            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,

-            JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {

-        // TODO Auto-generated method stub

-        return null;

-    }

+	@Override

+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(

+			IDataSource<S> arg0, IOperatorSchema arg1,

+			List<LogicalVariable> arg2, LogicalVariable arg3,

+			RecordDescriptor arg4, JobGenContext arg5, JobSpecification arg6)

+			throws AlgebricksException {

+		// TODO Auto-generated method stub

+		return null;

+	}

 

-    @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,

-            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,

-            JobSpecification arg5) throws AlgebricksException {

-        // TODO Auto-generated method stub

-        return null;

-    }

+	@Override

+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(

+			IDataSource<S> arg0, IOperatorSchema arg1,

+			List<LogicalVariable> arg2, LogicalVariable arg3,

+			JobGenContext arg4, JobSpecification arg5)

+			throws AlgebricksException {

+		// TODO Auto-generated method stub

+		return null;

+	}

 

-    @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,

-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,

-            JobSpecification spec) throws AlgebricksException {

-        return null;

-    }

+	@Override

+	public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {

+		return new HiveFunctionInfo(arg0, null);

+	}

 

-    @Override

-    public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {

-        return new HiveFunctionInfo(arg0, null);

-    }

+	@Override

+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,

+			int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,

+			JobSpecification spec) throws AlgebricksException {

+		return null;

+	}

 

-    @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(

-            IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,

-            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)

-            throws AlgebricksException {

-        // TODO Auto-generated method stub

-        return null;

-    }

+	@Override

+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(

+			IDataSourceIndex<T, S> dataSource,

+			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

+			IVariableTypeEnvironment typeEnv,

+			List<LogicalVariable> primaryKeys,

+			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,

+			RecordDescriptor recordDesc, JobGenContext context,

+			JobSpecification spec) throws AlgebricksException {

+		// TODO Auto-generated method stub

+		return null;

+	}

 

-    @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(

-            IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,

-            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)

-            throws AlgebricksException {

-        // TODO Auto-generated method stub

-        return null;

-    }

+	@Override

+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(

+			IDataSourceIndex<T, S> dataSource,

+			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

+			IVariableTypeEnvironment typeEnv,

+			List<LogicalVariable> primaryKeys,

+			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,

+			RecordDescriptor recordDesc, JobGenContext context,

+			JobSpecification spec) throws AlgebricksException {

+		// TODO Auto-generated method stub

+		return null;

+	}

 

 }

diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 0087307..4b8a278 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -41,23 +41,17 @@
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[6];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -85,23 +79,17 @@
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -136,23 +124,17 @@
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -187,23 +169,17 @@
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[13];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index 79f0874..ea2af13 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -46,23 +46,17 @@
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -90,23 +84,17 @@
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -141,23 +129,17 @@
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -192,23 +174,17 @@
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));