Added client API to retrieve node controller information
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@840 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
index 1377e37..90bbfb1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
@@ -16,6 +16,7 @@
import java.io.File;
import java.util.EnumSet;
+import java.util.Map;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@@ -88,4 +89,9 @@
public void waitForCompletion(JobId jobId) throws Exception {
hci.waitForCompletion(jobId);
}
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+ return hci.getNodeControllersInfo();
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 5563655..37a746b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -16,6 +16,7 @@
import java.io.File;
import java.util.EnumSet;
+import java.util.Map;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -98,7 +99,7 @@
public void start(JobId jobId) throws Exception;
/**
- * Waits untile the specified job has completed, either successfully or has
+ * Waits until the specified job has completed, either successfully or has
* encountered a permanent failure.
*
* @param jobId
@@ -106,4 +107,11 @@
* @throws Exception
*/
public void waitForCompletion(JobId jobId) throws Exception;
+
+ /**
+ * Gets a map of node controller names to node information.
+ *
+ * @return Map of node name to node information.
+ */
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index f57ac97..6e1eedc 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -16,6 +16,7 @@
import java.rmi.Remote;
import java.util.EnumSet;
+import java.util.Map;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -37,4 +38,6 @@
public void start(JobId jobId) throws Exception;
public void waitForCompletion(JobId jobId) throws Exception;
+
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
new file mode 100644
index 0000000..1424a63
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+public class NodeControllerInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ private final InetAddress ipAddress;
+
+ private final NodeStatus status;
+
+ public NodeControllerInfo(String nodeId, InetAddress ipAddress, NodeStatus status) {
+ this.nodeId = nodeId;
+ this.ipAddress = ipAddress;
+ this.status = status;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public InetAddress getIpAddress() {
+ return ipAddress;
+ }
+
+ public NodeStatus getStatus() {
+ return status;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
new file mode 100644
index 0000000..d6b99d0
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+public enum NodeStatus {
+ ALIVE,
+ DEAD
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
index 77694b4..70e85a1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
@@ -3,9 +3,11 @@
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.EnumSet;
+import java.util.Map;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -58,4 +60,9 @@
public void waitForCompletion(JobId jobId) throws Exception {
ccs.waitForCompletion(jobId);
}
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ return ccs.getNodeControllersInfo();
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index eaa057b..7c1f790 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -33,6 +33,7 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -47,6 +48,7 @@
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusConditionVariableWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
+import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
import edu.uci.ics.hyracks.control.cc.work.JobCreateWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -199,10 +201,6 @@
return nodeRegistry;
}
- public Map<String, Set<String>> getIPAddressNodeNameMap() {
- return ipAddressNodeNameMap;
- }
-
public CCConfig getConfig() {
return ccConfig;
}
@@ -250,8 +248,7 @@
}
@Override
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details)
- throws Exception {
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, details);
workQueue.schedule(tfe);
}
@@ -302,14 +299,14 @@
@Override
public void destroyApplication(String appName) throws Exception {
- FutureValue fv = new FutureValue();
+ FutureValue<Object> fv = new FutureValue<Object>();
workQueue.schedule(new ApplicationDestroyWork(this, appName, fv));
fv.get();
}
@Override
public void startApplication(final String appName) throws Exception {
- FutureValue fv = new FutureValue();
+ FutureValue<Object> fv = new FutureValue<Object>();
workQueue.schedule(new ApplicationStartWork(this, appName, fv));
fv.get();
}
@@ -320,6 +317,13 @@
}
@Override
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+ workQueue.schedule(new GetNodeControllersInfoWork(this, fv));
+ return fv.get();
+ }
+
+ @Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) {
workQueue.schedule(new RegisterPartitionAvailibilityWork(this, partitionDescriptor));
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
index de55847..c6af1b9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
@@ -29,9 +29,9 @@
public class ApplicationDestroyWork extends AbstractWork {
private final ClusterControllerService ccs;
private final String appName;
- private FutureValue fv;
+ private FutureValue<Object> fv;
- public ApplicationDestroyWork(ClusterControllerService ccs, String appName, FutureValue fv) {
+ public ApplicationDestroyWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
this.ccs = ccs;
this.appName = appName;
this.fv = fv;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
index a65703b..2022c7e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
@@ -30,9 +30,9 @@
public class ApplicationStartWork extends AbstractWork {
private final ClusterControllerService ccs;
private final String appName;
- private final FutureValue fv;
+ private final FutureValue<Object> fv;
- public ApplicationStartWork(ClusterControllerService ccs, String appName, FutureValue fv) {
+ public ApplicationStartWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
this.ccs = ccs;
this.appName = appName;
this.fv = fv;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
new file mode 100644
index 0000000..48d9b84
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.client.NodeStatus;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+
+public class GetNodeControllersInfoWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+ private FutureValue<Map<String, NodeControllerInfo>> fv;
+
+ public GetNodeControllersInfoWork(ClusterControllerService ccs, FutureValue<Map<String, NodeControllerInfo>> fv) {
+ this.ccs = ccs;
+ this.fv = fv;
+ }
+
+ @Override
+ public void run() {
+ Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
+ NodeStatus.ALIVE));
+ }
+ fv.setValue(result);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index 392708b..15daff5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -14,9 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -40,13 +38,5 @@
throw new Exception("Node with this name already registered.");
}
nodeMap.put(nodeId, state);
- Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
- String ipAddress = state.getNCConfig().dataIPAddress;
- Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
- if (nodes == null) {
- nodes = new HashSet<String>();
- ipAddressNodeNameMap.put(ipAddress, nodes);
- }
- nodes.add(nodeId);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 3575f81..7255503 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -47,20 +47,11 @@
}
}
Set<JobId> affectedJobIds = new HashSet<JobId>();
- Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
for (String deadNode : deadNodes) {
NodeControllerState state = nodeMap.remove(deadNode);
// Deal with dead tasks.
affectedJobIds.addAll(state.getActiveJobIds());
-
- String ipAddress = state.getNCConfig().dataIPAddress;
- Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
- if (ipNodes != null) {
- if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
- ipAddressNodeNameMap.remove(ipAddress);
- }
- }
}
int size = affectedJobIds.size();
if (size > 0) {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
index 0e4ccc0..00565b6 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
@@ -14,10 +14,10 @@
*/
package edu.uci.ics.hyracks.control.common.work;
-public class FutureValue {
+public class FutureValue<T> {
private boolean done;
- private Object value;
+ private T value;
private Exception e;
@@ -27,7 +27,7 @@
e = null;
}
- public synchronized void setValue(Object value) {
+ public synchronized void setValue(T value) {
done = true;
this.value = value;
e = null;
@@ -48,7 +48,7 @@
notifyAll();
}
- public synchronized Object get() throws Exception {
+ public synchronized T get() throws Exception {
while (!done) {
wait();
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 373d8b3..f535f3e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -351,10 +351,10 @@
@Override
public void run() {
try {
- FutureValue fv = new FutureValue();
+ FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
queue.scheduleAndSync(bjpw);
- List<JobProfile> profiles = (List<JobProfile>) fv.get();
+ List<JobProfile> profiles = fv.get();
if (!profiles.isEmpty()) {
cc.reportProfile(id, profiles);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
index 0aee348..574bc6d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
@@ -28,13 +27,11 @@
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
public class BuildJobProfilesWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(BuildJobProfilesWork.class.getName());
-
private final NodeControllerService ncs;
- private final FutureValue fv;
+ private final FutureValue<List<JobProfile>> fv;
- public BuildJobProfilesWork(NodeControllerService ncs, FutureValue fv) {
+ public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
this.ncs = ncs;
this.fv = fv;
}
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 883b25e..a5584e0 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -40,7 +40,7 @@
private static final int MAX_VICTIMIZATION_TRY_COUNT = 3;
private final int maxOpenFiles;
-
+
private final IIOManager ioManager;
private final int pageSize;
private final int numPages;
@@ -54,7 +54,8 @@
private boolean closed;
public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
- IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+ IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize,
+ int numPages, int maxOpenFiles) {
this.ioManager = ioManager;
this.pageSize = pageSize;
this.numPages = numPages;
@@ -91,21 +92,21 @@
if (closed) {
throw new HyracksDataException("pin called on a closed cache");
}
-
+
// check whether file has been created and opened
int fileId = BufferedFileHandle.getFileId(dpid);
BufferedFileHandle fInfo = fileInfoMap.get(fileId);
- if(fInfo == null) {
+ if (fInfo == null) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
- } else if(fInfo.getReferenceCount() <= 0) {
+ } else if (fInfo.getReferenceCount() <= 0) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
}
}
-
- @Override
- public ICachedPage tryPin(long dpid) throws HyracksDataException {
+
+ @Override
+ public ICachedPage tryPin(long dpid) throws HyracksDataException {
pinSanityCheck(dpid);
-
+
CachedPage cPage = null;
int hash = hash(dpid);
CacheBucket bucket = pageMap[hash];
@@ -123,17 +124,20 @@
} finally {
bucket.bucketLock.unlock();
}
-
+
return cPage;
}
-
+
@Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
pinSanityCheck(dpid);
-
+
CachedPage cPage = findPage(dpid, newPage);
if (cPage == null) {
- throw new HyracksDataException("Failed to pin page because all pages are pinned.");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(dumpState());
+ }
+ throw new HyracksDataException("Failed to pin page because all pages are pinned.");
}
if (!newPage) {
if (!cPage.valid) {
@@ -550,7 +554,7 @@
}
@Override
- public void close() {
+ public void close() {
closed = true;
synchronized (cleanerThread) {
cleanerThread.shutdownStart = true;
@@ -562,22 +566,22 @@
e.printStackTrace();
}
}
- }
-
+ }
+
synchronized (fileInfoMap) {
try {
- for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
- boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
- sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);
+ for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+ boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+ sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);
if (!fileHasBeenDeleted) {
- ioManager.close(entry.getValue().getFileHandle());
+ ioManager.close(entry.getValue().getFileHandle());
}
}
- } catch(HyracksDataException e) {
+ } catch (HyracksDataException e) {
e.printStackTrace();
}
fileInfoMap.clear();
- }
+ }
}
@Override
@@ -599,18 +603,18 @@
BufferedFileHandle fInfo;
fInfo = fileInfoMap.get(fileId);
if (fInfo == null) {
-
+
// map is full, make room by removing cleaning up unreferenced files
boolean unreferencedFileFound = true;
- while(fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
- unreferencedFileFound = false;
- for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
- if(entry.getValue().getReferenceCount() <= 0) {
+ while (fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
+ unreferencedFileFound = false;
+ for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+ if (entry.getValue().getReferenceCount() <= 0) {
int entryFileId = entry.getKey();
boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
- sweepAndFlush(entryFileId, !fileHasBeenDeleted);
+ sweepAndFlush(entryFileId, !fileHasBeenDeleted);
if (!fileHasBeenDeleted) {
- ioManager.close(entry.getValue().getFileHandle());
+ ioManager.close(entry.getValue().getFileHandle());
}
fileInfoMap.remove(entryFileId);
unreferencedFileFound = true;
@@ -619,11 +623,12 @@
}
}
}
-
- if(fileInfoMap.size() >= maxOpenFiles) {
- throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files " + maxOpenFiles + " already opened and referenced.");
+
+ if (fileInfoMap.size() >= maxOpenFiles) {
+ throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files "
+ + maxOpenFiles + " already opened and referenced.");
}
-
+
// create, open, and map new file reference
FileReference fileRef = fileMapManager.lookupFileName(fileId);
FileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
@@ -634,7 +639,7 @@
fInfo.incReferenceCount();
}
}
-
+
private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
for (int i = 0; i < pageMap.length; ++i) {
CacheBucket bucket = pageMap[i];
@@ -667,17 +672,18 @@
}
}
- private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages) throws HyracksDataException {
+ private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages)
+ throws HyracksDataException {
if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
int pinCount;
- if (cPage.dirty.get()) {
- if (flushDirtyPages) {
- write(cPage);
- }
+ if (cPage.dirty.get()) {
+ if (flushDirtyPages) {
+ write(cPage);
+ }
cPage.dirty.set(false);
pinCount = cPage.pinCount.decrementAndGet();
} else {
- pinCount = cPage.pinCount.get();
+ pinCount = cPage.pinCount.get();
}
if (pinCount != 0) {
throw new IllegalStateException("Page is pinned and file is being closed");
@@ -700,7 +706,7 @@
throw new HyracksDataException("Closing unopened file");
}
if (fInfo.decReferenceCount() < 0) {
- throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");
+ throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");
}
}
}
@@ -711,22 +717,22 @@
LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
}
synchronized (fileInfoMap) {
- BufferedFileHandle fInfo = null;
- try {
- fInfo = fileInfoMap.get(fileId);
- if (fInfo != null && fInfo.getReferenceCount() > 0) {
- throw new HyracksDataException("Deleting open file");
- }
- } finally {
- fileMapManager.unregisterFile(fileId);
- if (fInfo != null) {
- // Mark the fInfo as deleted,
- // such that when its pages are reclaimed in openFile(),
- // the pages are not flushed to disk but only invalidates.
- ioManager.close(fInfo.getFileHandle());
- fInfo.markAsDeleted();
- }
- }
+ BufferedFileHandle fInfo = null;
+ try {
+ fInfo = fileInfoMap.get(fileId);
+ if (fInfo != null && fInfo.getReferenceCount() > 0) {
+ throw new HyracksDataException("Deleting open file");
+ }
+ } finally {
+ fileMapManager.unregisterFile(fileId);
+ if (fInfo != null) {
+ // Mark the fInfo as deleted,
+ // such that when its pages are reclaimed in openFile(),
+ // the pages are not flushed to disk but only invalidates.
+ ioManager.close(fInfo.getFileHandle());
+ fInfo.markAsDeleted();
+ }
+ }
}
}
}
\ No newline at end of file