Added client API to retrieve node controller information

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@840 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
index 1377e37..90bbfb1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
@@ -16,6 +16,7 @@
 
 import java.io.File;
 import java.util.EnumSet;
+import java.util.Map;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -88,4 +89,9 @@
     public void waitForCompletion(JobId jobId) throws Exception {
         hci.waitForCompletion(jobId);
     }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+        return hci.getNodeControllersInfo();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 5563655..37a746b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -16,6 +16,7 @@
 
 import java.io.File;
 import java.util.EnumSet;
+import java.util.Map;
 
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -98,7 +99,7 @@
     public void start(JobId jobId) throws Exception;
 
     /**
-     * Waits untile the specified job has completed, either successfully or has
+     * Waits until the specified job has completed, either successfully or has
      * encountered a permanent failure.
      * 
      * @param jobId
@@ -106,4 +107,11 @@
      * @throws Exception
      */
     public void waitForCompletion(JobId jobId) throws Exception;
+
+    /**
+     * Gets a map of node controller names to node information.
+     * 
+     * @return Map of node name to node information.
+     */
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index f57ac97..6e1eedc 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -16,6 +16,7 @@
 
 import java.rmi.Remote;
 import java.util.EnumSet;
+import java.util.Map;
 
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -37,4 +38,6 @@
     public void start(JobId jobId) throws Exception;
 
     public void waitForCompletion(JobId jobId) throws Exception;
+
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
new file mode 100644
index 0000000..1424a63
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+public class NodeControllerInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String nodeId;
+
+    private final InetAddress ipAddress;
+
+    private final NodeStatus status;
+
+    public NodeControllerInfo(String nodeId, InetAddress ipAddress, NodeStatus status) {
+        this.nodeId = nodeId;
+        this.ipAddress = ipAddress;
+        this.status = status;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public InetAddress getIpAddress() {
+        return ipAddress;
+    }
+
+    public NodeStatus getStatus() {
+        return status;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
new file mode 100644
index 0000000..d6b99d0
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+public enum NodeStatus {
+    ALIVE,
+    DEAD
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
index 77694b4..70e85a1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
@@ -3,9 +3,11 @@
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.EnumSet;
+import java.util.Map;
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -58,4 +60,9 @@
     public void waitForCompletion(JobId jobId) throws Exception {
         ccs.waitForCompletion(jobId);
     }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        return ccs.getNodeControllersInfo();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index eaa057b..7c1f790 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -33,6 +33,7 @@
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -47,6 +48,7 @@
 import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusConditionVariableWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
+import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.JobCreateWork;
 import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
 import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -199,10 +201,6 @@
         return nodeRegistry;
     }
 
-    public Map<String, Set<String>> getIPAddressNodeNameMap() {
-        return ipAddressNodeNameMap;
-    }
-
     public CCConfig getConfig() {
         return ccConfig;
     }
@@ -250,8 +248,7 @@
     }
 
     @Override
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details)
-            throws Exception {
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
         TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, details);
         workQueue.schedule(tfe);
     }
@@ -302,14 +299,14 @@
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        FutureValue fv = new FutureValue();
+        FutureValue<Object> fv = new FutureValue<Object>();
         workQueue.schedule(new ApplicationDestroyWork(this, appName, fv));
         fv.get();
     }
 
     @Override
     public void startApplication(final String appName) throws Exception {
-        FutureValue fv = new FutureValue();
+        FutureValue<Object> fv = new FutureValue<Object>();
         workQueue.schedule(new ApplicationStartWork(this, appName, fv));
         fv.get();
     }
@@ -320,6 +317,13 @@
     }
 
     @Override
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+        workQueue.schedule(new GetNodeControllersInfoWork(this, fv));
+        return fv.get();
+    }
+
+    @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) {
         workQueue.schedule(new RegisterPartitionAvailibilityWork(this, partitionDescriptor));
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
index de55847..c6af1b9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
@@ -29,9 +29,9 @@
 public class ApplicationDestroyWork extends AbstractWork {
     private final ClusterControllerService ccs;
     private final String appName;
-    private FutureValue fv;
+    private FutureValue<Object> fv;
 
-    public ApplicationDestroyWork(ClusterControllerService ccs, String appName, FutureValue fv) {
+    public ApplicationDestroyWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
         this.ccs = ccs;
         this.appName = appName;
         this.fv = fv;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
index a65703b..2022c7e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
@@ -30,9 +30,9 @@
 public class ApplicationStartWork extends AbstractWork {
     private final ClusterControllerService ccs;
     private final String appName;
-    private final FutureValue fv;
+    private final FutureValue<Object> fv;
 
-    public ApplicationStartWork(ClusterControllerService ccs, String appName, FutureValue fv) {
+    public ApplicationStartWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
         this.ccs = ccs;
         this.appName = appName;
         this.fv = fv;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
new file mode 100644
index 0000000..48d9b84
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.client.NodeStatus;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+
+public class GetNodeControllersInfoWork extends AbstractWork {
+    private final ClusterControllerService ccs;
+    private FutureValue<Map<String, NodeControllerInfo>> fv;
+
+    public GetNodeControllersInfoWork(ClusterControllerService ccs, FutureValue<Map<String, NodeControllerInfo>> fv) {
+        this.ccs = ccs;
+        this.fv = fv;
+    }
+
+    @Override
+    public void run() {
+        Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
+        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+        for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
+                    NodeStatus.ALIVE));
+        }
+        fv.setValue(result);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index 392708b..15daff5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -14,9 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -40,13 +38,5 @@
             throw new Exception("Node with this name already registered.");
         }
         nodeMap.put(nodeId, state);
-        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
-        String ipAddress = state.getNCConfig().dataIPAddress;
-        Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
-        if (nodes == null) {
-            nodes = new HashSet<String>();
-            ipAddressNodeNameMap.put(ipAddress, nodes);
-        }
-        nodes.add(nodeId);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 3575f81..7255503 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -47,20 +47,11 @@
             }
         }
         Set<JobId> affectedJobIds = new HashSet<JobId>();
-        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
         for (String deadNode : deadNodes) {
             NodeControllerState state = nodeMap.remove(deadNode);
 
             // Deal with dead tasks.
             affectedJobIds.addAll(state.getActiveJobIds());
-
-            String ipAddress = state.getNCConfig().dataIPAddress;
-            Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
-            if (ipNodes != null) {
-                if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
-                    ipAddressNodeNameMap.remove(ipAddress);
-                }
-            }
         }
         int size = affectedJobIds.size();
         if (size > 0) {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
index 0e4ccc0..00565b6 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
@@ -14,10 +14,10 @@
  */
 package edu.uci.ics.hyracks.control.common.work;
 
-public class FutureValue {
+public class FutureValue<T> {
     private boolean done;
 
-    private Object value;
+    private T value;
 
     private Exception e;
 
@@ -27,7 +27,7 @@
         e = null;
     }
 
-    public synchronized void setValue(Object value) {
+    public synchronized void setValue(T value) {
         done = true;
         this.value = value;
         e = null;
@@ -48,7 +48,7 @@
         notifyAll();
     }
 
-    public synchronized Object get() throws Exception {
+    public synchronized T get() throws Exception {
         while (!done) {
             wait();
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 373d8b3..f535f3e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -351,10 +351,10 @@
         @Override
         public void run() {
             try {
-                FutureValue fv = new FutureValue();
+                FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
                 BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
                 queue.scheduleAndSync(bjpw);
-                List<JobProfile> profiles = (List<JobProfile>) fv.get();
+                List<JobProfile> profiles = fv.get();
                 if (!profiles.isEmpty()) {
                     cc.reportProfile(id, profiles);
                 }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
index 0aee348..574bc6d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -17,7 +17,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
@@ -28,13 +27,11 @@
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 
 public class BuildJobProfilesWork extends SynchronizableWork {
-    private static final Logger LOGGER = Logger.getLogger(BuildJobProfilesWork.class.getName());
-
     private final NodeControllerService ncs;
 
-    private final FutureValue fv;
+    private final FutureValue<List<JobProfile>> fv;
 
-    public BuildJobProfilesWork(NodeControllerService ncs, FutureValue fv) {
+    public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
         this.ncs = ncs;
         this.fv = fv;
     }
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 883b25e..a5584e0 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -40,7 +40,7 @@
     private static final int MAX_VICTIMIZATION_TRY_COUNT = 3;
 
     private final int maxOpenFiles;
-    
+
     private final IIOManager ioManager;
     private final int pageSize;
     private final int numPages;
@@ -54,7 +54,8 @@
     private boolean closed;
 
     public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
-            IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+            IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize,
+            int numPages, int maxOpenFiles) {
         this.ioManager = ioManager;
         this.pageSize = pageSize;
         this.numPages = numPages;
@@ -91,21 +92,21 @@
         if (closed) {
             throw new HyracksDataException("pin called on a closed cache");
         }
-        
+
         // check whether file has been created and opened
         int fileId = BufferedFileHandle.getFileId(dpid);
         BufferedFileHandle fInfo = fileInfoMap.get(fileId);
-        if(fInfo == null) {
+        if (fInfo == null) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
-        } else if(fInfo.getReferenceCount() <= 0) {
+        } else if (fInfo.getReferenceCount() <= 0) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
         }
     }
-    
-    @Override 
-    public ICachedPage tryPin(long dpid) throws HyracksDataException {        
+
+    @Override
+    public ICachedPage tryPin(long dpid) throws HyracksDataException {
         pinSanityCheck(dpid);
-        
+
         CachedPage cPage = null;
         int hash = hash(dpid);
         CacheBucket bucket = pageMap[hash];
@@ -123,17 +124,20 @@
         } finally {
             bucket.bucketLock.unlock();
         }
-        
+
         return cPage;
     }
-    
+
     @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
         pinSanityCheck(dpid);
-        
+
         CachedPage cPage = findPage(dpid, newPage);
         if (cPage == null) {
-        	throw new HyracksDataException("Failed to pin page because all pages are pinned.");
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(dumpState());
+            }
+            throw new HyracksDataException("Failed to pin page because all pages are pinned.");
         }
         if (!newPage) {
             if (!cPage.valid) {
@@ -550,7 +554,7 @@
     }
 
     @Override
-    public void close() {        
+    public void close() {
         closed = true;
         synchronized (cleanerThread) {
             cleanerThread.shutdownStart = true;
@@ -562,22 +566,22 @@
                     e.printStackTrace();
                 }
             }
-        }    
-        
+        }
+
         synchronized (fileInfoMap) {
             try {
-                for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
-                	boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
-                    sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);                            
+                for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+                    boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+                    sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);
                     if (!fileHasBeenDeleted) {
-                    	ioManager.close(entry.getValue().getFileHandle());
+                        ioManager.close(entry.getValue().getFileHandle());
                     }
                 }
-            } catch(HyracksDataException e) {
+            } catch (HyracksDataException e) {
                 e.printStackTrace();
             }
             fileInfoMap.clear();
-        }        
+        }
     }
 
     @Override
@@ -599,18 +603,18 @@
             BufferedFileHandle fInfo;
             fInfo = fileInfoMap.get(fileId);
             if (fInfo == null) {
-                
+
                 // map is full, make room by removing cleaning up unreferenced files
                 boolean unreferencedFileFound = true;
-                while(fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {                
-                    unreferencedFileFound = false;                    
-                    for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
-                        if(entry.getValue().getReferenceCount() <= 0) {
+                while (fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
+                    unreferencedFileFound = false;
+                    for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+                        if (entry.getValue().getReferenceCount() <= 0) {
                             int entryFileId = entry.getKey();
                             boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
-                            sweepAndFlush(entryFileId, !fileHasBeenDeleted);                            
+                            sweepAndFlush(entryFileId, !fileHasBeenDeleted);
                             if (!fileHasBeenDeleted) {
-                            	ioManager.close(entry.getValue().getFileHandle());
+                                ioManager.close(entry.getValue().getFileHandle());
                             }
                             fileInfoMap.remove(entryFileId);
                             unreferencedFileFound = true;
@@ -619,11 +623,12 @@
                         }
                     }
                 }
-                
-                if(fileInfoMap.size() >= maxOpenFiles) {
-                    throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files " + maxOpenFiles + " already opened and referenced.");
+
+                if (fileInfoMap.size() >= maxOpenFiles) {
+                    throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files "
+                            + maxOpenFiles + " already opened and referenced.");
                 }
-                
+
                 // create, open, and map new file reference
                 FileReference fileRef = fileMapManager.lookupFileName(fileId);
                 FileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
@@ -634,7 +639,7 @@
             fInfo.incReferenceCount();
         }
     }
-        
+
     private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
         for (int i = 0; i < pageMap.length; ++i) {
             CacheBucket bucket = pageMap[i];
@@ -667,17 +672,18 @@
         }
     }
 
-    private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages) throws HyracksDataException {
+    private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages)
+            throws HyracksDataException {
         if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
             int pinCount;
-        	if (cPage.dirty.get()) {
-				if (flushDirtyPages) {
-					write(cPage);
-				}
+            if (cPage.dirty.get()) {
+                if (flushDirtyPages) {
+                    write(cPage);
+                }
                 cPage.dirty.set(false);
                 pinCount = cPage.pinCount.decrementAndGet();
             } else {
-            	pinCount = cPage.pinCount.get();
+                pinCount = cPage.pinCount.get();
             }
             if (pinCount != 0) {
                 throw new IllegalStateException("Page is pinned and file is being closed");
@@ -700,7 +706,7 @@
                 throw new HyracksDataException("Closing unopened file");
             }
             if (fInfo.decReferenceCount() < 0) {
-                throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");                                
+                throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");
             }
         }
     }
@@ -711,22 +717,22 @@
             LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
         }
         synchronized (fileInfoMap) {
-        	BufferedFileHandle fInfo = null;
-        	try {
-				fInfo = fileInfoMap.get(fileId);
-				if (fInfo != null && fInfo.getReferenceCount() > 0) {
-					throw new HyracksDataException("Deleting open file");
-				}
-			} finally {
-				fileMapManager.unregisterFile(fileId);
-				if (fInfo != null) {
-					// Mark the fInfo as deleted, 
-					// such that when its pages are reclaimed in openFile(),
-					// the pages are not flushed to disk but only invalidates.
-					ioManager.close(fInfo.getFileHandle());
-					fInfo.markAsDeleted();
-				}
-			}       
+            BufferedFileHandle fInfo = null;
+            try {
+                fInfo = fileInfoMap.get(fileId);
+                if (fInfo != null && fInfo.getReferenceCount() > 0) {
+                    throw new HyracksDataException("Deleting open file");
+                }
+            } finally {
+                fileMapManager.unregisterFile(fileId);
+                if (fInfo != null) {
+                    // Mark the fInfo as deleted, 
+                    // such that when its pages are reclaimed in openFile(),
+                    // the pages are not flushed to disk but only invalidates.
+                    ioManager.close(fInfo.getFileHandle());
+                    fInfo.markAsDeleted();
+                }
+            }
         }
     }
 }
\ No newline at end of file