Divide Cluster into Unique Partitions
The change includes the following:
- Fix passing NC stores to AsterixConfiguration.
- Unify storage direcotry name in the instance level rather than the node level.
- Divide the cluster into unique storage partitions based on the number of stores.
- Refactored FileSplits and moved out of AqlMetadataProvider.
- Make AsterixHyracksIntegrationUtil use the passed configuration file.
- Make File Splits pass relative index paths of partitions rather than absolute paths.
- Remove unused AqlCompiledMetadataDeclarations class.
Change-Id: I8c7fbca5113dd7ad569a46dfa2591addb5bf8655
Reviewed-on: https://asterix-gerrit.ics.uci.edu/564
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 45c0598..4a8a323 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -187,7 +187,8 @@
String nodeId = ncApplicationContext.getNodeId();
replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
- metadataProperties.getStores().get(nodeId)[0], nodeId, replicationProperties.getReplicationStore());
+ AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
+ replicationProperties.getReplicationStore());
replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -377,7 +378,6 @@
@Override
public void initializeResourceIdFactory() throws HyracksDataException {
- resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext)
- .createResourceIdFactory();
+ resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
}
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 0145651..d7842e8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,11 +19,10 @@
package org.apache.asterix.api.common;
import java.io.File;
-import java.io.IOException;
import java.util.EnumSet;
+import java.util.Set;
import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -41,22 +40,18 @@
public class AsterixHyracksIntegrationUtil {
private static final String IO_DIR_KEY = "java.io.tmpdir";
- public static final int NODES = 2;
- public static final int PARTITONS = 2;
-
public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
-
public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
public static ClusterControllerService cc;
- public static NodeControllerService[] ncs = new NodeControllerService[NODES];
+ public static NodeControllerService[] ncs;
public static IHyracksClientConnection hcc;
- protected static AsterixTransactionProperties txnProperties;
+ private static AsterixPropertiesAccessor propertiesAccessor;
public static void init(boolean deleteOldInstanceData) throws Exception {
- AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
- txnProperties = new AsterixTransactionProperties(apa);
+ propertiesAccessor = new AsterixPropertiesAccessor();
+ ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
if (deleteOldInstanceData) {
deleteTransactionLogs();
removeTestStorageFiles();
@@ -77,7 +72,8 @@
// Starts ncs.
int n = 0;
- for (String ncName : getNcNames()) {
+ Set<String> nodes = propertiesAccessor.getNodeNames();
+ for (String ncName : nodes) {
NCConfig ncConfig1 = new NCConfig();
ncConfig1.ccHost = "localhost";
ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
@@ -87,13 +83,28 @@
ncConfig1.nodeId = ncName;
ncConfig1.resultTTL = 30000;
ncConfig1.resultSweepThreshold = 1000;
- for (int p = 0; p < PARTITONS; ++p) {
+ String tempPath = System.getProperty(IO_DIR_KEY);
+ if (tempPath.endsWith(File.separator)) {
+ tempPath = tempPath.substring(0, tempPath.length() - 1);
+ }
+ //get initial partitions from properties
+ String[] nodeStores = propertiesAccessor.getStores().get(ncName);
+ if (nodeStores == null) {
+ throw new Exception("Coudn't find stores for NC: " + ncName);
+ }
+ String tempDirPath = System.getProperty(IO_DIR_KEY);
+ if (!tempDirPath.endsWith(File.separator)) {
+ tempDirPath += File.separator;
+ }
+ for (int p = 0; p < nodeStores.length; p++) {
+ //create IO devices based on stores
+ String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
+ File ioDeviceDir = new File(iodevicePath);
+ ioDeviceDir.mkdirs();
if (p == 0) {
- ncConfig1.ioDevices = System.getProperty("java.io.tmpdir") + File.separator + ncConfig1.nodeId
- + "/iodevice" + p;
+ ncConfig1.ioDevices = iodevicePath;
} else {
- ncConfig1.ioDevices += "," + System.getProperty("java.io.tmpdir") + File.separator
- + ncConfig1.nodeId + "/iodevice" + p;
+ ncConfig1.ioDevices += "," + iodevicePath;
}
}
ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
@@ -105,19 +116,7 @@
}
public static String[] getNcNames() {
- String[] names = new String[NODES];
- for (int n = 0; n < NODES; ++n) {
- names[n] = "asterix_nc" + (n + 1);
- }
- return names;
- }
-
- public static String[] getDataDirs() {
- String[] names = new String[NODES];
- for (int n = 0; n < NODES; ++n) {
- names[n] = "asterix_nc" + (n + 1) + "data";
- }
- return names;
+ return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
}
public static IHyracksClientConnection getHyracksClientConnection() {
@@ -147,17 +146,17 @@
hcc.waitForCompletion(jobId);
}
- private static void removeTestStorageFiles() throws IOException {
+ public static void removeTestStorageFiles() {
File dir = new File(System.getProperty(IO_DIR_KEY));
- for (String ncName : AsterixHyracksIntegrationUtil.getNcNames()) {
+ for (String ncName : propertiesAccessor.getNodeNames()) {
File ncDir = new File(dir, ncName);
FileUtils.deleteQuietly(ncDir);
}
}
private static void deleteTransactionLogs() throws Exception {
- for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
- File log = new File(txnProperties.getLogDirectory(ncId));
+ for (String ncId : propertiesAccessor.getNodeNames()) {
+ File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
if (log.exists()) {
FileUtils.deleteDirectory(log);
}
@@ -185,7 +184,7 @@
try {
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
- init(false);
+ init(true);
while (true) {
Thread.sleep(10000);
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 714e05c..08b92e7 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -1246,7 +1246,6 @@
}
}
jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-
//#. mark PendingDropOp on the dataverse record by
// first, deleting the dataverse record from the DATAVERSE_DATASET
// second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 147a356..496c2f8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -46,11 +46,11 @@
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.AsterixFilesUtil;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -202,7 +202,6 @@
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
-
if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
@@ -213,7 +212,7 @@
PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
.getLocalResourceRepository();
- localResourceRepository.initializeNewUniverse(metadataProperties.getStores().get(nodeId)[0]);
+ localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
}
IAsterixStateProxy proxy = null;
@@ -277,22 +276,18 @@
performLocalCleanUp();
}
- private void performLocalCleanUp() throws IOException {
+ private void performLocalCleanUp() {
//delete working area files from failed jobs
runtimeContext.getIOManager().deleteWorkspaceFiles();
//reclaim storage for temporary datasets.
- PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
- .getLocalResourceRepository();
-
- String[] storageMountingPoints = localResourceRepository.getStorageMountingPoints();
- String storageFolderName = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
- .get(nodeId)[0];
-
- for (String mountPoint : storageMountingPoints) {
- String tempDatasetFolder = mountPoint + storageFolderName + File.separator
- + AqlMetadataProvider.TEMP_DATASETS_STORAGE_FOLDER;
- AsterixFilesUtil.deleteFolder(tempDatasetFolder);
+ //get node stores
+ String[] nodeStores = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
+ .get(nodeId);
+ for (String store : nodeStores) {
+ String tempDatasetFolder = store + File.separator
+ + SplitsAndConstraintsUtil.TEMP_DATASETS_STORAGE_FOLDER;
+ FileUtils.deleteQuietly(new File(tempDatasetFolder));
}
// TODO
@@ -309,7 +304,7 @@
if (cluster == null) {
throw new IllegalStateException("No cluster configuration found for this instance");
}
- String asterixInstanceName = cluster.getInstanceName();
+ String asterixInstanceName = metadataProperties.getInstanceName();
AsterixTransactionProperties txnProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getTransactionProperties();
Node self = null;
@@ -322,8 +317,14 @@
for (Node node : nodes) {
String ncId = asterixInstanceName + "_" + node.getId();
if (ncId.equalsIgnoreCase(nodeId)) {
- String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
- metadataProperties.getStores().put(nodeId, storeDir.split(","));
+ String storeDir = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+ String[] ioDevicePaths = nodeIoDevices.trim().split(",");
+ for (int i = 0; i < ioDevicePaths.length; i++) {
+ //construct full store path
+ ioDevicePaths[i] += File.separator + storeDir;
+ }
+ metadataProperties.getStores().put(nodeId, ioDevicePaths);
String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
metadataProperties.getCoredumpPaths().put(nodeId, coredumpPath);
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index fa20099..731113b 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,11 +20,11 @@
<metadataNode>asterix_nc1</metadataNode>
<store>
<ncId>asterix_nc1</ncId>
- <storeDirs>asterix_nc1data</storeDirs>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<store>
<ncId>asterix_nc2</ncId>
- <storeDirs>asterix_nc2data</storeDirs>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<transactionLogDir>
<ncId>asterix_nc1</ncId>
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 376b2ff..6c6e411 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -65,11 +65,6 @@
if (files == null || files.length == 0) {
outdir.delete();
}
-
- // clean up the files written by the ASTERIX storage manager
- for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
- testExecutor.deleteRec(new File(d));
- }
}
@Parameters
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 052e0be..7a55c90 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -68,10 +68,6 @@
@AfterClass
public static void tearDown() throws Exception {
ExecutionTestUtil.tearDown();
- // clean up the files written by the ASTERIX storage manager
- for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
- testExecutor.deleteRec(new File(d));
- }
}
@Parameters
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index d5f4db3..22a3ad7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -65,10 +65,7 @@
@AfterClass
public static void tearDown() throws Exception {
ExecutionTestUtil.tearDown();
- // clean up the files written by the ASTERIX storage manager
- for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
- testExecutor.deleteRec(new File(d));
- }
+ AsterixHyracksIntegrationUtil.removeTestStorageFiles();
}
@Parameters
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
new file mode 100644
index 0000000..6cd44a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ClusterPartition implements Cloneable {
+ private final int partitionId;
+ private final String nodeId;
+ private final int ioDeviceNum;
+ private String activeNodeId = null;
+ private boolean active = false;
+
+ public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
+ this.partitionId = partitionId;
+ this.nodeId = nodeId;
+ this.ioDeviceNum = ioDeviceNum;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public int getIODeviceNum() {
+ return ioDeviceNum;
+ }
+
+ public String getActiveNodeId() {
+ return activeNodeId;
+ }
+
+ public void setActiveNodeId(String activeNodeId) {
+ this.activeNodeId = activeNodeId;
+ }
+
+ public void setActive(boolean active) {
+ this.active = active;
+ }
+
+ @Override
+ public ClusterPartition clone() {
+ ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ return clone;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ID:" + partitionId);
+ sb.append(" Original Node: " + nodeId);
+ sb.append(" IODevice: " + ioDeviceNum);
+ sb.append(" Active Node: " + activeNodeId);
+ return sb.toString();
+ }
+
+ public boolean isActive() {
+ return active;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index adaca46..8e2c4e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -20,6 +20,9 @@
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
public class AsterixMetadataProperties extends AbstractAsterixProperties {
@@ -35,8 +38,8 @@
return accessor.getMetadataNodeName();
}
- public String getMetadataStore() {
- return accessor.getMetadataStore();
+ public ClusterPartition getMetadataPartition() {
+ return accessor.getMetadataPartiton();
}
public Map<String, String[]> getStores() {
@@ -54,4 +57,12 @@
public Map<String, String> getCoredumpPaths() {
return accessor.getCoredumpConfig();
}
+
+ public Map<String, ClusterPartition[]> getNodePartitions() {
+ return accessor.getNodePartitions();
+ }
+
+ public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+ return accessor.getClusterPartitions();
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index d6c81ab..cc7ec84 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -28,6 +28,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -35,6 +37,7 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Coredump;
import org.apache.asterix.common.configuration.Property;
@@ -53,12 +56,15 @@
private final Map<String, Property> asterixConfigurationParams;
private final Map<String, String> transactionLogDirs;
private final Map<String, String> asterixBuildProperties;
+ private final Map<String, ClusterPartition[]> nodePartitionsMap;
+ private SortedMap<Integer, ClusterPartition> clusterPartitions;
public AsterixPropertiesAccessor() throws AsterixException {
String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
if (fileName == null) {
fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
}
+
InputStream is = this.getClass().getClassLoader().getResourceAsStream(fileName);
if (is == null) {
try {
@@ -82,9 +88,20 @@
stores = new HashMap<String, String[]>();
List<Store> configuredStores = asterixConfiguration.getStore();
nodeNames = new HashSet<String>();
+ nodePartitionsMap = new HashMap<>();
+ clusterPartitions = new TreeMap<>();
+ int uniquePartitionId = 0;
for (Store store : configuredStores) {
String trimmedStoreDirs = store.getStoreDirs().trim();
- stores.put(store.getNcId(), trimmedStoreDirs.split(","));
+ String[] nodeStores = trimmedStoreDirs.split(",");
+ ClusterPartition[] nodePartitions = new ClusterPartition[nodeStores.length];
+ for (int i = 0; i < nodePartitions.length; i++) {
+ ClusterPartition partition = new ClusterPartition(uniquePartitionId++, store.getNcId(), i);
+ clusterPartitions.put(partition.getPartitionId(), partition);
+ nodePartitions[i] = partition;
+ }
+ stores.put(store.getNcId(), nodeStores);
+ nodePartitionsMap.put(store.getNcId(), nodePartitions);
nodeNames.add(store.getNcId());
}
asterixConfigurationParams = new HashMap<String, Property>();
@@ -116,10 +133,6 @@
return metadataNodeName;
}
- public String getMetadataStore() {
- return stores.get(metadataNodeName)[0];
- }
-
public Map<String, String[]> getStores() {
return stores;
}
@@ -172,7 +185,7 @@
}
}
- private <T> void logConfigurationError(Property p, T defaultValue) {
+ private static <T> void logConfigurationError(Property p, T defaultValue) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Invalid property value '" + p.getValue() + "' for property '" + p.getName()
+ "'.\n See the description: \n" + p.getDescription() + "\nDefault = " + defaultValue);
@@ -182,4 +195,17 @@
public String getInstanceName() {
return instanceName;
}
+
+ public ClusterPartition getMetadataPartiton() {
+ //metadata partition is always the first partition on the metadata node
+ return nodePartitionsMap.get(metadataNodeName)[0];
+ }
+
+ public Map<String, ClusterPartition[]> getNodePartitions() {
+ return nodePartitionsMap;
+ }
+
+ public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+ return clusterPartitions;
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index adf1152..5062d06 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -78,9 +78,9 @@
}
@Override
- public synchronized IIndex getIndex(String resourceName) throws HyracksDataException {
- int datasetID = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+ int datasetID = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
return getIndex(datasetID, resourceID);
}
@@ -98,9 +98,9 @@
}
@Override
- public synchronized void register(String resourceName, IIndex index) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
dsInfo = getDatasetInfo(did);
@@ -116,16 +116,16 @@
dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
}
- public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
- LocalResource lr = resourceRepository.getResourceByName(resourceName);
+ public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
if (lr == null) {
return -1;
}
return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
}
- public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
- LocalResource lr = resourceRepository.getResourceByName(resourceName);
+ public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
if (lr == null) {
return -1;
}
@@ -133,9 +133,9 @@
}
@Override
- public synchronized void unregister(String resourceName) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void unregister(String resourcePath) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
IndexInfo iInfo = dsInfo.indexes.get(resourceID);
@@ -180,9 +180,9 @@
}
@Override
- public synchronized void open(String resourceName) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void open(String resourcePath) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null || !dsInfo.isRegistered) {
@@ -262,9 +262,9 @@
}
@Override
- public synchronized void close(String resourceName) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void close(String resourcePath) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
@@ -704,9 +704,9 @@
}
@Override
- public synchronized void allocateMemory(String resourceName) throws HyracksDataException {
+ public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
//a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
- int did = Integer.parseInt(resourceName);
+ int did = Integer.parseInt(resourcePath);
allocateDatasetMemory(did);
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 51168ae..fd1ebb8 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -63,7 +63,7 @@
try {
writer.open();
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
+ indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index d3203d5..872c959 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -113,7 +113,6 @@
<xs:element ref="cl:java_home" minOccurs="0" />
<xs:element ref="cl:log_dir" minOccurs="0" />
<xs:element ref="cl:txn_log_dir" minOccurs="0" />
- <xs:element ref="cl:store" minOccurs="0" />
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
</xs:sequence>
diff --git a/asterix-common/src/main/resources/schema/yarn_cluster.xsd b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
index f54cf90..8827985 100644
--- a/asterix-common/src/main/resources/schema/yarn_cluster.xsd
+++ b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
@@ -138,9 +138,6 @@
ref="cl:txn_log_dir"
minOccurs="0" />
<xs:element
- ref="cl:store"
- minOccurs="0" />
- <xs:element
ref="cl:iodevices"
minOccurs="0" />
<xs:element
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index d04a3dd..d8147b6 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -71,7 +71,7 @@
this.port = 19002;
}
- public TestExecutor(String host, int port){
+ public TestExecutor(String host, int port) {
this.host = host;
this.port = port;
}
@@ -225,12 +225,16 @@
// In future this may be changed depending on the requested
// output format sent to the servlet.
String errorBody = method.getResponseBodyAsString();
- JSONObject result = new JSONObject(errorBody);
- String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
- result.getString("stacktrace") };
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
- throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
- + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+ try {
+ JSONObject result = new JSONObject(errorBody);
+ String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+ result.getString("stacktrace") };
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+ throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+ + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+ } catch (Exception e) {
+ throw new Exception(errorBody);
+ }
}
return statusCode;
}
@@ -303,7 +307,7 @@
}
private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
- final String url = "http://"+host+":"+port+"/query/result";
+ final String url = "http://" + host + ":" + port + "/query/result";
// Create a method instance.
GetMethod method = new GetMethod(url);
@@ -430,9 +434,9 @@
switch (ctx.getType()) {
case "ddl":
if (ctx.getFile().getName().endsWith("aql")) {
- executeDDL(statement, "http://"+host+":"+port+"/ddl");
+ executeDDL(statement, "http://" + host + ":" + port + "/ddl");
} else {
- executeDDL(statement, "http://"+host+":"+port+"/ddl/sqlpp");
+ executeDDL(statement, "http://" + host + ":" + port + "/ddl/sqlpp");
}
break;
case "update":
@@ -442,9 +446,9 @@
"127.0.0.1://../../../../../../asterix-app/");
}
if (ctx.getFile().getName().endsWith("aql")) {
- executeUpdate(statement, "http://"+host+":"+port+"/update");
+ executeUpdate(statement, "http://" + host + ":" + port + "/update");
} else {
- executeUpdate(statement, "http://"+host+":"+port+"/update/sqlpp");
+ executeUpdate(statement, "http://" + host + ":" + port + "/update/sqlpp");
}
break;
case "query":
@@ -461,25 +465,25 @@
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
if (ctx.getFile().getName().endsWith("aql")) {
if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query",
- cUnit.getParameter());
+ resultStream = executeQuery(statement, fmt,
+ "http://" + host + ":" + port + "/query", cUnit.getParameter());
} else if (ctx.getType().equalsIgnoreCase("async")) {
resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://"+host+":"+port+"/aql");
+ "http://" + host + ":" + port + "/aql");
} else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://"+host+":"+port+"/aql");
+ "http://" + host + ":" + port + "/aql");
}
} else {
if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query/sqlpp",
- cUnit.getParameter());
+ resultStream = executeQuery(statement, fmt,
+ "http://" + host + ":" + port + "/query/sqlpp", cUnit.getParameter());
} else if (ctx.getType().equalsIgnoreCase("async")) {
resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://"+host+":"+port+"/sqlpp");
+ "http://" + host + ":" + port + "/sqlpp");
} else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://"+host+":"+port+"/sqlpp");
+ "http://" + host + ":" + port + "/sqlpp");
}
}
@@ -506,7 +510,7 @@
break;
case "txnqbc": //qbc represents query before crash
resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://"+host+":"+port+"/query", cUnit.getParameter());
+ "http://" + host + ":" + port + "/query", cUnit.getParameter());
qbcFile = new File(actualPath + File.separator
+ testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
+ cUnit.getName() + "_qbc.adm");
@@ -515,7 +519,7 @@
break;
case "txnqar": //qar represents query after recovery
resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://"+host+":"+port+"/query", cUnit.getParameter());
+ "http://" + host + ":" + port + "/query", cUnit.getParameter());
qarFile = new File(actualPath + File.separator
+ testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
+ cUnit.getName() + "_qar.adm");
@@ -528,7 +532,7 @@
break;
case "txneu": //eu represents erroneous update
try {
- executeUpdate(statement, "http://"+host+":"+port+"/update");
+ executeUpdate(statement, "http://" + host + ":" + port + "/update");
} catch (Exception e) {
//An exception is expected.
failed = true;
@@ -556,7 +560,7 @@
break;
case "errddl": // a ddlquery that expects error
try {
- executeDDL(statement, "http://"+host+":"+port+"/ddl");
+ executeDDL(statement, "http://" + host + ":" + port + "/ddl");
} catch (Exception e) {
// expected error happens
failed = true;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index c92262c..29765fd 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
public class EventDriver {
public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
private static String eventsDir;
private static Events events;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index d6e7da0..b83faa2 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -191,7 +191,7 @@
String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
.getMasterNode().getJavaHome();
return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, null, cluster.getMasterNode().getDebugPort());
+ null, null, cluster.getMasterNode().getDebugPort());
}
List<Node> nodeList = cluster.getNode();
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 33ba787..4bd5098 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -45,7 +45,6 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
-import org.apache.commons.io.IOUtils;
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Coredump;
import org.apache.asterix.common.configuration.Store;
@@ -59,6 +58,7 @@
import org.apache.asterix.event.schema.cluster.Env;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.event.schema.cluster.Property;
+import org.apache.commons.io.IOUtils;
public class AsterixEventServiceUtil {
@@ -90,8 +90,8 @@
return instance;
}
- public static void createAsterixZip(AsterixInstance asterixInstance) throws IOException, InterruptedException,
- JAXBException, EventException {
+ public static void createAsterixZip(AsterixInstance asterixInstance)
+ throws IOException, InterruptedException, JAXBException, EventException {
String asterixInstanceDir = asterixInstanceDir(asterixInstance);
unzip(AsterixEventService.getAsterixZip(), asterixInstanceDir);
@@ -128,18 +128,18 @@
clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, ccJavaOpts));
clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, ncJavaOpts));
- clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
- + "asterix"));
+ clusterProperties
+ .add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator + "asterix"));
clusterProperties.add(new Property("LOG_DIR", cluster.getLogDir()));
clusterProperties.add(new Property("JAVA_HOME", cluster.getJavaHome()));
clusterProperties.add(new Property("WORKING_DIR", cluster.getWorkingDir().getDir()));
clusterProperties.add(new Property("CLIENT_NET_IP", cluster.getMasterNode().getClientIp()));
clusterProperties.add(new Property("CLUSTER_NET_IP", cluster.getMasterNode().getClusterIp()));
- int clusterNetPort = cluster.getMasterNode().getClusterPort() != null ? cluster.getMasterNode()
- .getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
- int clientNetPort = cluster.getMasterNode().getClientPort() != null ? cluster.getMasterNode().getClientPort()
- .intValue() : CLIENT_NET_PORT_DEFAULT;
+ int clusterNetPort = cluster.getMasterNode().getClusterPort() != null
+ ? cluster.getMasterNode().getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
+ int clientNetPort = cluster.getMasterNode().getClientPort() != null
+ ? cluster.getMasterNode().getClientPort().intValue() : CLIENT_NET_PORT_DEFAULT;
int httpPort = cluster.getMasterNode().getHttpPort() != null ? cluster.getMasterNode().getHttpPort().intValue()
: HTTP_PORT_DEFAULT;
@@ -151,8 +151,8 @@
}
private static String asterixZipName() {
- return AsterixEventService.getAsterixZip().substring(
- AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+ return AsterixEventService.getAsterixZip()
+ .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
}
private static String asterixJarPath(AsterixInstance asterixInstance, String asterixInstanceDir) {
@@ -174,8 +174,8 @@
new File(asterixInstanceDir + File.separator + ASTERIX_CONFIGURATION_FILE).delete();
}
- private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir, AsterixInstance asterixInstance)
- throws IOException, EventException, JAXBException {
+ private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir,
+ AsterixInstance asterixInstance) throws IOException, EventException, JAXBException {
File sourceJar = new File(asterixJarPath(asterixInstance, asterixInstanceDir));
writeAsterixClusterConfigurationFile(asterixInstance);
@@ -185,8 +185,8 @@
new File(asterixInstanceDir + File.separator + CLUSTER_CONFIGURATION_FILE).delete();
}
- private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance) throws IOException,
- EventException, JAXBException {
+ private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance)
+ throws IOException, EventException, JAXBException {
String asterixInstanceName = asterixInstance.getName();
Cluster cluster = asterixInstance.getCluster();
@@ -197,8 +197,8 @@
+ asterixInstanceName + File.separator + "cluster.xml"));
}
- public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName,
- String libraryName, String libraryPath) throws IOException {
+ public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName, String libraryName,
+ String libraryPath) throws IOException {
File instanceDir = new File(asterixInstanceDir(asterixInstance));
if (!instanceDir.exists()) {
instanceDir.mkdirs();
@@ -235,30 +235,8 @@
return metadataNode;
}
- public static String getNodeDirectories(String asterixInstanceName, Node node, Cluster cluster) {
- String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
- String[] storeDirs = null;
- StringBuffer nodeDataStore = new StringBuffer();
- String storeDirValue = node.getStore();
- if (storeDirValue == null) {
- storeDirValue = cluster.getStore();
- if (storeDirValue == null) {
- throw new IllegalStateException(" Store not defined for node " + node.getId());
- }
- storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
- }
-
- storeDirs = storeDirValue.split(",");
- for (String ns : storeDirs) {
- nodeDataStore.append(ns + File.separator + storeDataSubDir.trim());
- nodeDataStore.append(",");
- }
- nodeDataStore.deleteCharAt(nodeDataStore.length() - 1);
- return nodeDataStore.toString();
- }
-
- private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance) throws IOException,
- JAXBException {
+ private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance)
+ throws IOException, JAXBException {
String asterixInstanceName = asterixInstance.getName();
Cluster cluster = asterixInstance.getCluster();
String metadataNodeId = asterixInstance.getMetadataNodeId();
@@ -266,29 +244,34 @@
AsterixConfiguration configuration = asterixInstance.getAsterixConfiguration();
configuration.setInstanceName(asterixInstanceName);
configuration.setMetadataNode(asterixInstanceName + "_" + metadataNodeId);
- String storeDir = null;
List<Store> stores = new ArrayList<Store>();
+ String storeDir = cluster.getStore().trim();
for (Node node : cluster.getNode()) {
- storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
- stores.add(new Store(asterixInstanceName + "_" + node.getId(), storeDir));
+ String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+ String[] nodeIdDevice = iodevices.split(",");
+ StringBuilder nodeStores = new StringBuilder();
+ for (int i = 0; i < nodeIdDevice.length; i++) {
+ nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+ }
+ //remove last comma
+ nodeStores.deleteCharAt(nodeStores.length() - 1);
+ stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeStores.toString()));
}
configuration.setStore(stores);
-
List<Coredump> coredump = new ArrayList<Coredump>();
String coredumpDir = null;
List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
String txnLogDir = null;
for (Node node : cluster.getNode()) {
coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
- coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir + File.separator
- + asterixInstanceName + "_" + node.getId()));
+ coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(),
+ coredumpDir + File.separator + asterixInstanceName + "_" + node.getId()));
txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
txnLogDirs.add(new TransactionLogDir(asterixInstanceName + "_" + node.getId(), txnLogDir));
}
configuration.setCoredump(coredump);
configuration.setTransactionLogDir(txnLogDirs);
-
File asterixConfDir = new File(AsterixEventService.getAsterixDir() + File.separator + asterixInstanceName);
asterixConfDir.mkdirs();
@@ -300,8 +283,6 @@
os.close();
}
-
-
public static void unzip(String sourceFile, String destDir) throws IOException {
BufferedOutputStream dest = null;
FileInputStream fis = new FileInputStream(sourceFile);
@@ -432,8 +413,8 @@
}
}
if (!valid) {
- throw new EventException("Asterix instance by the name " + name + " is in " + instance.getState()
- + " state ");
+ throw new EventException(
+ "Asterix instance by the name " + name + " is in " + instance.getState() + " state ");
}
return instance;
}
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b6aaddb..6085019 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -164,11 +164,11 @@
String store;
String pargs;
String iodevices;
+ store = cluster.getStore();
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
- store = node.getStore() == null ? cluster.getStore() : node.getStore();
pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
+ backupId + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
@@ -188,12 +188,12 @@
String txnLogDir;
String store;
String pargs;
+ store = cluster.getStore();
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
txnLogDir = node.getTxnLogDir() == null ? instance.getCluster().getTxnLogDir() : node.getTxnLogDir();
- store = node.getStore() == null ? cluster.getStore() : node.getStore();
pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + txnLogDir + " " + backupId + " " + backupDir
+ " " + "local" + " " + node.getId();
@@ -212,14 +212,12 @@
VerificationUtil.verifyBackupRestoreConfiguration(hdfsUrl, hadoopVersion, hdfsBackupDir);
String workingDir = cluster.getWorkingDir().getDir();
int backupId = backupInfo.getId();
- String nodeStore;
String pargs;
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
- nodeStore = node.getStore() == null ? clusterStore : node.getStore();
- pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+ pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
+ backupId + " " + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
+ hadoopVersion;
@@ -235,14 +233,12 @@
String backupDir = backupInfo.getBackupConf().getBackupDir();
String workingDir = cluster.getWorkingDir().getDir();
int backupId = backupInfo.getId();
- String nodeStore;
String pargs;
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
- nodeStore = node.getStore() == null ? clusterStore : node.getStore();
- pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+ pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
+ backupId + " " + backupDir + " " + "local" + " " + node.getId();
Event event = new Event("restore", nodeid, pargs);
@@ -262,8 +258,8 @@
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
- String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp()
- + " " + workingDir;
+ String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp() + " "
+ + workingDir;
Event event = new Event("directory_transfer", nodeid, pargs);
Pattern p = new Pattern(null, 1, null, event);
addInitialDelay(p, 2, "sec");
@@ -428,8 +424,8 @@
patternList.add(p);
}
- pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir
- + " " + "unpack";
+ pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir + " "
+ + "unpack";
event = new Event("file_transfer", nodeid, pargs);
p = new Pattern(null, 1, null, event);
patternList.add(p);
@@ -529,8 +525,8 @@
String[] nodeIODevices;
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
nodeIODevices = iodevices.trim().split(",");
+ String nodeStore = cluster.getStore().trim();
for (String nodeIODevice : nodeIODevices) {
- String nodeStore = node.getStore() == null ? cluster.getStore() : node.getStore();
pargs = nodeIODevice.trim() + File.separator + nodeStore;
Event event = new Event("file_delete", nodeid, pargs);
patternList.add(new Pattern(null, 1, null, event));
@@ -540,13 +536,15 @@
return patterns;
}
- private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp, String destDir) {
+ private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp,
+ String destDir) {
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
- String asterixZipName = AsterixEventService.getAsterixZip().substring(
- AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
- String fileToTransfer = new File(AsterixEventService.getAsterixDir() + File.separator + instanceName
- + File.separator + asterixZipName).getAbsolutePath();
+ String asterixZipName = AsterixEventService.getAsterixZip()
+ .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+ String fileToTransfer = new File(
+ AsterixEventService.getAsterixDir() + File.separator + instanceName + File.separator + asterixZipName)
+ .getAbsolutePath();
String pargs = username + " " + fileToTransfer + " " + destinationIp + " " + destDir + " " + "unpack";
Event event = new Event("file_transfer", nodeid, pargs);
return new Pattern(null, 1, null, event);
@@ -607,8 +605,8 @@
ps.add(p);
nodeid = new Nodeid(new Value(null, nodeToBeAdded.getId()));
- pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp()
- + " " + workingDir;
+ pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp() + " "
+ + workingDir;
event = new Event("directory_transfer", nodeid, pargs);
p = new Pattern(null, 1, null, event);
addInitialDelay(p, 2, "sec");
@@ -626,8 +624,8 @@
String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
String srcHost = cluster.getMasterNode().getClientIp();
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
- String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
- .getLogDir();
+ String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
+ : cluster.getMasterNode().getLogDir();
String destDir = outputDir + File.separator + "cc";
String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
Event event = new Event("directory_copy", nodeid, pargs);
@@ -649,7 +647,7 @@
Patterns patterns = new Patterns(patternList);
return patterns;
}
-
+
private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
List<Pattern> patternList = new ArrayList<Pattern>();
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ebf41cc..c4a96f4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -34,7 +34,6 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import org.apache.hadoop.fs.BlockLocation;
@@ -211,12 +210,10 @@
Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
for (String i : stores.keySet()) {
String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- locs.add(i);
- }
+ //two readers per partition
+ locs.add(i);
+ locs.add(i);
}
}
String[] cluster = new String[locs.size()];
@@ -273,67 +270,67 @@
* @throws IOException
*/
protected InputSplit[] getSplits(JobConf conf) throws IOException {
- // Create file system object
- FileSystem fs = FileSystem.get(conf);
ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
- // Create files splits
- for (ExternalFile file : files) {
- Path filePath = new Path(file.getFileName());
- FileStatus fileStatus;
- try {
- fileStatus = fs.getFileStatus(filePath);
- } catch (FileNotFoundException e) {
- // file was deleted at some point, skip to next file
- continue;
- }
- if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
- && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
- // Get its information from HDFS name node
- BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
- // Create a split per block
- for (BlockLocation block : fileBlocks) {
- if (block.getOffset() < file.getSize()) {
- fileSplits
- .add(new FileSplit(filePath,
- block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
- ? block.getLength() : (file.getSize() - block.getOffset()),
- block.getHosts()));
- orderedExternalFiles.add(file);
- }
+ // Create file system object
+ try (FileSystem fs = FileSystem.get(conf)) {
+ // Create files splits
+ for (ExternalFile file : files) {
+ Path filePath = new Path(file.getFileName());
+ FileStatus fileStatus;
+ try {
+ fileStatus = fs.getFileStatus(filePath);
+ } catch (FileNotFoundException e) {
+ // file was deleted at some point, skip to next file
+ continue;
}
- } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
- && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
- long oldSize = 0L;
- long newSize = file.getSize();
- for (int i = 0; i < files.size(); i++) {
- if (files.get(i).getFileName() == file.getFileName() && files.get(i).getSize() != file.getSize()) {
- newSize = files.get(i).getSize();
- oldSize = file.getSize();
- break;
- }
- }
-
- // Get its information from HDFS name node
- BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
- // Create a split per block
- for (BlockLocation block : fileBlocks) {
- if (block.getOffset() + block.getLength() > oldSize) {
- if (block.getOffset() < newSize) {
- // Block interact with delta -> Create a split
- long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
- long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
- : block.getOffset() + block.getLength() - newSize;
- long splitLength = block.getLength() - startCut - endCut;
- fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+ if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
+ && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+ // Get its information from HDFS name node
+ BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
+ // Create a split per block
+ for (BlockLocation block : fileBlocks) {
+ if (block.getOffset() < file.getSize()) {
+ fileSplits.add(new FileSplit(filePath,
+ block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+ ? block.getLength() : (file.getSize() - block.getOffset()),
block.getHosts()));
orderedExternalFiles.add(file);
}
}
+ } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
+ && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+ long oldSize = 0L;
+ long newSize = file.getSize();
+ for (int i = 0; i < files.size(); i++) {
+ if (files.get(i).getFileName() == file.getFileName()
+ && files.get(i).getSize() != file.getSize()) {
+ newSize = files.get(i).getSize();
+ oldSize = file.getSize();
+ break;
+ }
+ }
+
+ // Get its information from HDFS name node
+ BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
+ // Create a split per block
+ for (BlockLocation block : fileBlocks) {
+ if (block.getOffset() + block.getLength() > oldSize) {
+ if (block.getOffset() < newSize) {
+ // Block interact with delta -> Create a split
+ long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
+ long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+ : block.getOffset() + block.getLength() - newSize;
+ long splitLength = block.getLength() - startCut - endCut;
+ fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+ block.getHosts()));
+ orderedExternalFiles.add(file);
+ }
+ }
+ }
}
}
}
- fs.close();
files = orderedExternalFiles;
return fileSplits.toArray(new FileSplit[fileSplits.size()]);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 11e2b96..8bf6d93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -32,7 +32,6 @@
import org.apache.asterix.om.types.AUnionType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
@@ -186,11 +185,8 @@
Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
for (String i : stores.keySet()) {
String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- }
+ locs.add(i);
}
}
String[] cluster = new String[locs.size()];
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
index bce4620..6ff991b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.indexing.operators;
-import java.io.File;
import java.util.List;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -49,9 +48,7 @@
@Override
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+ FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
fileManager.deleteTransactionFiles();
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
index 0eacc15..e89a8db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -49,18 +50,15 @@
@Override
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- System.err.println("performing the operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getIODeviceId()));
+ FileReference resourecePath = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
+ System.err.println("performing the operation on "+ resourecePath.getFile().getAbsolutePath());
// Get DataflowHelper
IIndexDataflowHelper indexHelper = indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition);
// Get index
IIndex index = indexHelper.getIndexInstance();
// commit transaction
((ITwoPCIndex) index).commitTransaction();
- System.err.println("operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
- .getFileSplits()[partition].getIODeviceId()) + " Succeded");
+ System.err.println("operation on "+ resourecePath.getFile().getAbsolutePath() + " Succeded");
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 8e7a288..9bdfaa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -49,9 +49,7 @@
@Override
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+ FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
fileManager.recoverTransaction();
}
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 2254f6f..09c65c8 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -27,8 +27,6 @@
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
-import org.kohsuke.args4j.Option;
-
import org.apache.asterix.event.management.EventUtil;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.MasterNode;
@@ -37,6 +35,7 @@
import org.apache.asterix.installer.driver.InstallerDriver;
import org.apache.asterix.installer.schema.conf.Configuration;
import org.apache.asterix.installer.schema.conf.Zookeeper;
+import org.kohsuke.args4j.Option;
public class ValidateCommand extends AbstractCommand {
@@ -97,7 +96,7 @@
valid = false;
} else {
cluster = EventUtil.getCluster(clusterPath);
- validateClusterProperties(cluster);
+ valid = valid & validateClusterProperties(cluster);
Set<String> servers = new HashSet<String>();
Set<String> serverIds = new HashSet<String>();
@@ -106,7 +105,7 @@
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null, null);
+ masterNode.getLogDir(), null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
@@ -158,7 +157,7 @@
return true;
}
- private void validateClusterProperties(Cluster cluster) {
+ private boolean validateClusterProperties(Cluster cluster) {
List<String> tempDirs = new ArrayList<String>();
if (cluster.getLogDir() != null && checkTemporaryPath(cluster.getLogDir())) {
tempDirs.add("Log directory: " + cluster.getLogDir());
@@ -176,6 +175,11 @@
LOGGER.warn(msg);
}
+ if (cluster.getStore() == null || cluster.getStore().length() == 0) {
+ LOGGER.fatal("store not defined at cluster" + ERROR);
+ return false;
+ }
+ return true;
}
private boolean validateNodeConfiguration(Node node, Cluster cluster) {
@@ -201,14 +205,6 @@
}
}
- if (node.getStore() == null || node.getStore().length() == 0) {
- if (!cluster.getMasterNode().getId().equals(node.getId())
- && (cluster.getStore() == null || cluster.getStore().length() == 0)) {
- valid = false;
- LOGGER.fatal("store not defined at cluster/node level for node: " + node.getId() + ERROR);
- }
- }
-
if (node.getIodevices() == null || node.getIodevices().length() == 0) {
if (!cluster.getMasterNode().getId().equals(node.getId())
&& (cluster.getIodevices() == null || cluster.getIodevices().length() == 0)) {
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
index 56da6ee..1ac60ba 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
@@ -48,14 +48,11 @@
String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
String[] storeDirs = null;
StringBuffer nodeDataStore = new StringBuffer();
- String storeDirValue = node.getStore();
+ String storeDirValue = cluster.getStore();
if (storeDirValue == null) {
- storeDirValue = cluster.getStore();
- if (storeDirValue == null) {
- throw new IllegalStateException(" Store not defined for node " + node.getId());
- }
- storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
+ throw new IllegalStateException(" Store not defined for node " + node.getId());
}
+ storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
storeDirs = storeDirValue.split(",");
for (String ns : storeDirs) {
@@ -66,8 +63,8 @@
return nodeDataStore.toString();
}
- public static AsterixConfiguration getAsterixConfiguration(String asterixConf) throws FileNotFoundException,
- IOException, JAXBException {
+ public static AsterixConfiguration getAsterixConfiguration(String asterixConf)
+ throws FileNotFoundException, IOException, JAXBException {
if (asterixConf == null) {
asterixConf = InstallerDriver.getManagixHome() + File.separator + DEFAULT_ASTERIX_CONFIGURATION_PATH;
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index f22b2f1..5317fc2 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -62,8 +63,10 @@
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.feeds.AdapterIdentifier;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
@@ -108,7 +111,6 @@
private static IIOManager ioManager;
private static String metadataNodeName;
- private static String metadataStore;
private static Set<String> nodeNames;
private static String outputDir;
@@ -147,9 +149,7 @@
AsterixMetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
metadataNodeName = metadataProperties.getMetadataNodeName();
- metadataStore = metadataProperties.getMetadataStore();
nodeNames = metadataProperties.getNodeNames();
- // nodeStores = asterixProperity.getStores();
dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
localResourceRepository = runtimeContext.getLocalResourceRepository();
@@ -375,11 +375,14 @@
private static void enlistMetadataDataset(IMetadataIndex index, boolean create, MetadataTransactionContext mdTxnCtx)
throws Exception {
- String filePath = ioManager.getIODevices().get(runtimeContext.getMetaDataIODeviceId()).getPath()
- + File.separator
- + IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
- runtimeContext.getMetaDataIODeviceId());
- FileReference file = new FileReference(new File(filePath));
+ ClusterPartition metadataPartition = propertiesProvider.getMetadataProperties().getMetadataPartition();
+ int metadataDeviceId = metadataPartition.getIODeviceNum();
+ String metadataPartitionPath = SplitsAndConstraintsUtil.prepareStoragePartitionPath(
+ AsterixClusterProperties.INSTANCE.getStorageDirectoryName(),
+ metadataPartition.getPartitionId());
+ String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
+ FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
+
List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
.getVirtualBufferCaches(index.getDatasetId().getId());
ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -391,7 +394,7 @@
? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
: new BaseOperationTracker(index.getDatasetId().getId(),
dataLifecycleManager.getDatasetInfo(index.getDatasetId().getId()));
- final String path = file.getFile().getPath();
+ final String absolutePath = file.getFile().getPath();
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
@@ -409,12 +412,13 @@
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
- localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
- dataLifecycleManager.register(path, lsmBtree);
+ localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName,
+ metadataPartition.getPartitionId(), absolutePath));
+ dataLifecycleManager.register(absolutePath, lsmBtree);
} else {
- final LocalResource resource = localResourceRepository.getResourceByName(path);
+ final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
resourceID = resource.getResourceId();
- lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
+ lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
@@ -424,7 +428,7 @@
opTracker, runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
null, null, null, null, true);
- dataLifecycleManager.register(path, lsmBtree);
+ dataLifecycleManager.register(absolutePath, lsmBtree);
}
}
@@ -529,4 +533,4 @@
MetadataManager.INSTANCE.releaseWriteLatch();
}
}
-}
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
deleted file mode 100644
index 92ab90d..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata.declared;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.annotations.TypeDataGen;
-import org.apache.asterix.common.config.AsterixMetadataProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataManager;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public class AqlCompiledMetadataDeclarations {
- private static Logger LOGGER = Logger.getLogger(AqlCompiledMetadataDeclarations.class.getName());
-
- // We are assuming that there is a one AqlCompiledMetadataDeclarations per
- // transaction.
- private final MetadataTransactionContext mdTxnCtx;
- private String dataverseName = null;
- private FileSplit outputFile;
- private Map<String, String[]> stores;
- private IDataFormat format;
- private Map<String, String> config;
-
- private final Map<String, IAType> types;
- private final Map<String, TypeDataGen> typeDataGenMap;
- private final IAWriterFactory writerFactory;
-
- private IMetadataManager metadataManager = MetadataManager.INSTANCE;
- private boolean isConnected = false;
-
- public AqlCompiledMetadataDeclarations(MetadataTransactionContext mdTxnCtx, String dataverseName,
- FileSplit outputFile, Map<String, String> config, Map<String, String[]> stores, Map<String, IAType> types,
- Map<String, TypeDataGen> typeDataGenMap, IAWriterFactory writerFactory, boolean online) {
- this.mdTxnCtx = mdTxnCtx;
- this.dataverseName = dataverseName;
- this.outputFile = outputFile;
- this.config = config;
- AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
- if (stores == null && online) {
- this.stores = metadataProperties.getStores();
- } else {
- this.stores = stores;
- }
- this.types = types;
- this.typeDataGenMap = typeDataGenMap;
- this.writerFactory = writerFactory;
- }
-
- public void connectToDataverse(String dvName) throws AlgebricksException, AsterixException {
- if (isConnected) {
- throw new AlgebricksException("You are already connected to " + dataverseName + " dataverse");
- }
- Dataverse dv;
- try {
- dv = metadataManager.getDataverse(mdTxnCtx, dvName);
- } catch (Exception e) {
- throw new AsterixException(e);
- }
- if (dv == null) {
- throw new AlgebricksException("There is no dataverse with this name " + dvName + " to connect to.");
- }
- dataverseName = dvName;
- isConnected = true;
- try {
- format = (IDataFormat) Class.forName(dv.getDataFormat()).newInstance();
- } catch (Exception e) {
- throw new AsterixException(e);
- }
- }
-
- public void disconnectFromDataverse() throws AlgebricksException {
- if (!isConnected) {
- throw new AlgebricksException("You are not connected to any dataverse");
- }
- dataverseName = null;
- format = null;
- isConnected = false;
- }
-
- public boolean isConnectedToDataverse() {
- return isConnected;
- }
-
- public String getDataverseName() {
- return dataverseName;
- }
-
- public FileSplit getOutputFile() {
- return outputFile;
- }
-
- public IDataFormat getFormat() throws AlgebricksException {
- if (!isConnected) {
- throw new AlgebricksException("You need first to connect to a dataverse.");
- }
- return format;
- }
-
- public String getPropertyValue(String propertyName) {
- return config.get(propertyName);
- }
-
- public IAType findType(String typeName) {
- Datatype type;
- try {
- type = metadataManager.getDatatype(mdTxnCtx, dataverseName, typeName);
- } catch (Exception e) {
- throw new IllegalStateException();
- }
- if (type == null) {
- throw new IllegalStateException();
- }
- return type.getDatatype();
- }
-
- public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException {
- NodeGroup ng;
- try {
- ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- if (ng == null) {
- throw new AlgebricksException("No node group with this name " + nodeGroupName);
- }
- return ng.getNodeNames();
- }
-
- public Map<String, String[]> getAllStores() {
- return stores;
- }
-
- public Dataset findDataset(String datasetName) throws AlgebricksException {
- try {
- return metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
- try {
- return metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
- try {
- return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
- try {
- return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public void setOutputFile(FileSplit outputFile) {
- this.outputFile = outputFile;
- }
-
- public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- String datasetName, String targetIdxName) throws AlgebricksException {
- FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
- String[] loc = new String[splits.length];
- for (int p = 0; p < splits.length; p++) {
- loc[p] = splits[p].getNodeName();
- }
- AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
- return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
- }
-
- private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
- throws AlgebricksException {
-
- File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
- Dataset dataset = findDataset(datasetName);
- if (dataset.getDatasetType() != DatasetType.INTERNAL) {
- throw new AlgebricksException("Not an internal dataset");
- }
- List<String> nodeGroup = findNodeGroupNodeNames(dataset.getNodeGroupName());
- if (nodeGroup == null) {
- throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
- }
-
- List<FileSplit> splitArray = new ArrayList<FileSplit>();
- for (String nd : nodeGroup) {
- String[] nodeStores = stores.get(nd);
- if (nodeStores == null) {
- LOGGER.warning("Node " + nd + " has no stores.");
- throw new AlgebricksException("Node " + nd + " has no stores.");
- } else {
- for (int j = 0; j < nodeStores.length; j++) {
- File f = new File(nodeStores[j] + File.separator + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f)));
- }
- }
- }
- FileSplit[] splits = new FileSplit[splitArray.size()];
- int i = 0;
- for (FileSplit fs : splitArray) {
- splits[i++] = fs;
- }
- return splits;
- }
-
- public String getRelativePath(String fileName) {
- return dataverseName + File.separator + fileName;
- }
-
- public Map<String, TypeDataGen> getTypeDataGenMap() {
- return typeDataGenMap;
- }
-
- public Map<String, IAType> getTypeDeclarations() {
- return types;
- }
-
- public IAWriterFactory getWriterFactory() {
- return writerFactory;
- }
-
- public MetadataTransactionContext getMetadataTransactionContext() {
- return mdTxnCtx;
- }
-}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
index ad06737..e9603366 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
@@ -19,10 +19,6 @@
package org.apache.asterix.metadata.declared;
-import java.util.ArrayList;
-import java.util.Map;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
@@ -49,16 +45,6 @@
@Override
public AlgebricksPartitionConstraint getClusterLocations() {
- Map<String, String[]> stores = metadataProvider.getAllStores();
- ArrayList<String> locs = new ArrayList<String>();
- for (String k : stores.keySet()) {
- String[] nodeStores = stores.get(k);
- for (int j = 0; j < nodeStores.length; j++) {
- locs.add(k);
- }
- }
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- return new AlgebricksAbsolutePartitionConstraint(cluster);
+ return metadataProvider.getClusterLocations();
}
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index d61d323..745f436 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -91,6 +91,7 @@
import org.apache.asterix.metadata.feeds.FeedUtil;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -148,7 +149,6 @@
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.ShortPointable;
@@ -160,16 +160,13 @@
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
@@ -2095,10 +2092,6 @@
return jobId;
}
- public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
- return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
- }
-
public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
throws AlgebricksException {
return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
@@ -2128,7 +2121,7 @@
List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
.getNodeNames();
for (String nd : nodeGroup) {
- numPartitions += AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
+ numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
}
return numElementsHint /= numPartitions;
}
@@ -2141,88 +2134,17 @@
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
- return splitProviderAndPartitionConstraints(splits);
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraints(splits);
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
String dataverse) {
- FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
- return splitProviderAndPartitionConstraints(splits);
- }
-
- private Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
- FileSplit[] splits) {
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
- String[] loc = new String[splits.length];
- for (int p = 0; p < splits.length; p++) {
- loc[p] = splits[p].getNodeName();
- }
- AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
- return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
- }
-
- private FileSplit[] splitsForDataverse(MetadataTransactionContext mdTxnCtx, String dataverseName) {
- File relPathFile = new File(dataverseName);
- List<FileSplit> splits = new ArrayList<FileSplit>();
- for (Map.Entry<String, String[]> entry : stores.entrySet()) {
- String node = entry.getKey();
- String[] nodeStores = entry.getValue();
- if (nodeStores == null) {
- continue;
- }
- for (int i = 0; i < nodeStores.length; i++) {
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
- String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(node);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator + relPathFile);
- splits.add(new FileSplit(node, new FileReference(f), k));
- }
- }
- }
- }
- return splits.toArray(new FileSplit[] {});
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
}
public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
String targetIdxName, boolean temp) throws AlgebricksException {
- try {
- File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
- .getNodeNames();
- if (nodeGroup == null) {
- throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
- }
-
- List<FileSplit> splitArray = new ArrayList<FileSplit>();
- for (String nd : nodeGroup) {
- String[] nodeStores = stores.get(nd);
- if (nodeStores == null) {
- LOGGER.warning("Node " + nd + " has no stores.");
- throw new AlgebricksException("Node " + nd + " has no stores.");
- } else {
- int numIODevices;
- if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
- numIODevices = 1;
- } else {
- numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
- }
- String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- File f = new File(ioDevices[k] + File.separator + nodeStores[j]
- + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
- + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f), k));
- }
- }
- }
- }
- return splitArray.toArray(new FileSplit[0]);
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
+ return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
}
private static Map<String, String> initializeAdapterFactoryMapping() {
@@ -2256,10 +2178,6 @@
return adapter;
}
- private static String getRelativePath(String dataverseName, String fileName) {
- return dataverseName + File.separator + fileName;
- }
-
public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
try {
return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
@@ -2307,19 +2225,7 @@
}
public AlgebricksPartitionConstraint getClusterLocations() {
- ArrayList<String> locs = new ArrayList<String>();
- for (String i : stores.keySet()) {
- String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- }
- }
- }
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- return new AlgebricksAbsolutePartitionConstraint(cluster);
+ return AsterixClusterProperties.INSTANCE.getClusterLocations();
}
public IDataFormat getFormat() {
@@ -2334,7 +2240,7 @@
* @return a new map containing the original dataset properties and the
* scheduler/locations
*/
- private Map<String, Object> wrapProperties(Map<String, String> properties) {
+ private static Map<String, Object> wrapProperties(Map<String, String> properties) {
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(properties);
// wrappedProperties.put(SCHEDULER, hdfsScheduler);
@@ -2349,7 +2255,7 @@
* the original properties
* @return the new stirng-object map
*/
- private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+ private static Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(properties);
return wrappedProperties;
@@ -2357,58 +2263,8 @@
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
- FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
- return splitProviderAndPartitionConstraints(splits);
- }
-
- private FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
- String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-
- try {
- File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
- .getNodeNames();
- if (nodeGroup == null) {
- throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
- }
-
- List<FileSplit> splitArray = new ArrayList<FileSplit>();
- for (String nd : nodeGroup) {
- String[] nodeStores = stores.get(nd);
- if (nodeStores == null) {
- LOGGER.warning("Node " + nd + " has no stores.");
- throw new AlgebricksException("Node " + nd + " has no stores.");
- } else {
- // Only the first partition when create
- String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
- if (create) {
- for (int j = 0; j < nodeStores.length; j++) {
- File f = new File(
- ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f), 0));
- }
- } else {
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- File f = new File(
- ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f), 0));
- }
- }
- }
- }
- }
- FileSplit[] splits = new FileSplit[splitArray.size()];
- int i = 0;
- for (FileSplit fs : splitArray) {
- splits[i++] = fs;
- }
- return splits;
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
+ datasetName, targetIdxName, create);
}
public AsterixStorageProperties getStorageProperties() {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
new file mode 100644
index 0000000..5ef58cd
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitsAndConstraintsUtil {
+
+ public static final String PARTITION_DIR_PREFIX = "partition_";
+ public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
+ public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+
+ private static FileSplit[] splitsForDataverse(String dataverseName) {
+ File relPathFile = new File(dataverseName);
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ //get all partitions
+ ClusterPartition[] clusterPartition = AsterixClusterProperties.INSTANCE.getClusterPartitons();
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ for (int j = 0; j < clusterPartition.length; j++) {
+ int nodeParitions = AsterixClusterProperties.INSTANCE
+ .getNodePartitionsCount(clusterPartition[j].getNodeId());
+ for (int i = 0; i < nodeParitions; i++) {
+ File f = new File(prepareStoragePartitionPath(storageDirName, clusterPartition[i].getPartitionId())
+ + File.separator + relPathFile);
+ splits.add(getFileSplitForClusterPartition(clusterPartition[j], f));
+ }
+ }
+ return splits.toArray(new FileSplit[] {});
+ }
+
+ public static FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+ try {
+ File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+ .getNodeNames();
+ if (nodeGroup == null) {
+ throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+ }
+
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ for (String nd : nodeGroup) {
+ int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
+ ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nd);
+ //currently this case is never executed since the metadata group doesn't exists
+ if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
+ numPartitions = 1;
+ }
+
+ for (int k = 0; k < numPartitions; k++) {
+ //format: 'storage dir name'/partition_#/dataverse/dataset_idx_index
+ File f = new File(prepareStoragePartitionPath(storageDirName, nodePartitions[k].getPartitionId())
+ + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
+ + relPathFile);
+ splits.add(getFileSplitForClusterPartition(nodePartitions[k], f));
+ }
+ }
+ return splits.toArray(new FileSplit[] {});
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ private static FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+ try {
+ File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+ .getNodeNames();
+ if (nodeGroup == null) {
+ throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+ }
+
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ for (String nodeId : nodeGroup) {
+ //get node partitions
+ ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nodeId);
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ int firstPartition = 0;
+ if (create) {
+ // Only the first partition when create
+ File f = new File(
+ prepareStoragePartitionPath(storageDirName, nodePartitions[firstPartition].getPartitionId())
+ + File.separator + relPathFile);
+ splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+ } else {
+ for (int k = 0; k < nodePartitions.length; k++) {
+ File f = new File(prepareStoragePartitionPath(storageDirName,
+ nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
+ splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+ }
+ }
+ }
+ return splits.toArray(new FileSplit[] {});
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+ String dataverse) {
+ FileSplit[] splits = splitsForDataverse(dataverse);
+ return splitProviderAndPartitionConstraints(splits);
+ }
+
+ public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
+ MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName,
+ boolean create) throws AlgebricksException {
+ FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
+ return splitProviderAndPartitionConstraints(splits);
+ }
+
+ public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+ FileSplit[] splits) {
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+ String[] loc = new String[splits.length];
+ for (int p = 0; p < splits.length; p++) {
+ loc[p] = splits[p].getNodeName();
+ }
+ AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
+ return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
+ }
+
+ private static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, File relativeFile) {
+ return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile), partition.getIODeviceNum(),
+ partition.getPartitionId());
+ }
+
+ public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
+ return storageDirName + File.separator + PARTITION_DIR_PREFIX + partitonId;
+ }
+
+ private static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
+ return dataverseName + File.separator + datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+ }
+}
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index f2482da..95eea63 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -33,6 +34,7 @@
import javax.xml.bind.Unmarshaller;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -50,7 +52,7 @@
public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
private static final String IO_DEVICES = "iodevices";
-
+ private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
private final Cluster cluster;
@@ -59,6 +61,9 @@
private boolean globalRecoveryCompleted = false;
+ private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+ private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+
private AsterixClusterProperties() {
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
if (is != null) {
@@ -66,37 +71,73 @@
JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
Unmarshaller unmarshaller = ctx.createUnmarshaller();
cluster = (Cluster) unmarshaller.unmarshal(is);
-
} catch (JAXBException e) {
throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
}
} else {
cluster = null;
}
+ //if this is the CC process
+ if (AsterixAppContextInfo.getInstance() != null) {
+ if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
+ node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
+ clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+ }
+ }
}
private ClusterState state = ClusterState.UNUSABLE;
public synchronized void removeNCConfiguration(String nodeId) {
+ updateNodePartitions(nodeId, false);
ncConfiguration.remove(nodeId);
- if (ncConfiguration.keySet().size() != AsterixAppContextInfo.getInstance().getMetadataProperties()
- .getNodeNames().size()) {
- state = ClusterState.UNUSABLE;
- LOGGER.info("Cluster now is in UNSABLE state");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Removing configuration parameters for node id " + nodeId);
}
- resetClusterPartitionConstraint();
+ //TODO implement fault tolerance as follows:
+ //1. collect the partitions of the failed NC
+ //2. For each partition, request a remote replica to take over.
+ //3. wait until each remote replica completes the recovery for the lost partitions
+ //4. update the cluster state
}
public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
ncConfiguration.put(nodeId, configuration);
- if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
- .getNodeNames().size()) {
- state = ClusterState.ACTIVE;
- }
+ updateNodePartitions(nodeId, true);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" Registering configuration parameters for node id " + nodeId);
}
- resetClusterPartitionConstraint();
+ }
+
+ private synchronized void updateNodePartitions(String nodeId, boolean added) {
+ ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+ //if this isn't a storage node, it will not have cluster partitions
+ if (nodePartitions != null) {
+ for (ClusterPartition p : nodePartitions) {
+ //set the active node for this node's partitions
+ p.setActive(added);
+ if (added) {
+ p.setActiveNodeId(nodeId);
+ } else {
+ p.setActiveNodeId(null);
+ }
+ }
+ resetClusterPartitionConstraint();
+ updateClusterState();
+ }
+ }
+
+ private synchronized void updateClusterState() {
+ for (ClusterPartition p : clusterPartitions.values()) {
+ if (!p.isActive()) {
+ state = ClusterState.UNUSABLE;
+ LOGGER.info("Cluster is in UNSABLE state");
+ return;
+ }
+ }
+ //if all storage partitions are active, then the cluster is active
+ state = ClusterState.ACTIVE;
+ LOGGER.info("Cluster is now ACTIVE");
}
/**
@@ -162,20 +203,14 @@
}
private synchronized void resetClusterPartitionConstraint() {
- Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
- ArrayList<String> locs = new ArrayList<String>();
- for (String i : stores.keySet()) {
- String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- }
+ ArrayList<String> clusterActiveLocations = new ArrayList<>();
+ for (ClusterPartition p : clusterPartitions.values()) {
+ if (p.isActive()) {
+ clusterActiveLocations.add(p.getActiveNodeId());
}
}
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
+ clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ clusterActiveLocations.toArray(new String[] {}));
}
public boolean isGlobalRecoveryCompleted() {
@@ -194,7 +229,34 @@
return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
}
- public static int getNumberOfNodes(){
+ public static int getNumberOfNodes() {
return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
}
+
+ public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+ return node2PartitionsMap.get(nodeId);
+ }
+
+ public synchronized int getNodePartitionsCount(String node) {
+ if (node2PartitionsMap.containsKey(node)) {
+ return node2PartitionsMap.get(node).length;
+ }
+ return 0;
+ }
+
+ public synchronized ClusterPartition[] getClusterPartitons() {
+ ArrayList<ClusterPartition> partitons = new ArrayList<>();
+ for (ClusterPartition cluster : clusterPartitions.values()) {
+ partitons.add(cluster);
+ }
+ return partitons.toArray(new ClusterPartition[] {});
+ }
+
+ public String getStorageDirectoryName() {
+ if (cluster != null) {
+ return cluster.getStore();
+ }
+ //virtual cluster without cluster config file
+ return DEFAULT_STORAGE_DIR_NAME;
+ }
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 58bc44e..794f867 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,13 +50,13 @@
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 48ebff3..06a1957 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -47,12 +47,12 @@
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 23eb2be..f2a6820 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 6e0394a..8d838a3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
index 0566367..15224e2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
@@ -33,7 +33,7 @@
}
@Override
- public LocalResource createLocalResource(long resourceId, String resourceName, int partition) {
- return new LocalResource(resourceId, resourceName, partition, resourceType, localResourceMetadata);
+ public LocalResource createLocalResource(long resourceId, String resourceName, int partition, String resourcePath) {
+ return new LocalResource(resourceId, resourceName, partition, resourcePath, resourceType, localResourceMetadata);
}
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 8ae3eb1..52fd806 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -64,8 +64,7 @@
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
- public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId)
- throws HyracksDataException {
+ public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
mountPoints = new String[devices.size()];
this.nodeId = nodeId;
for (int i = 0; i < mountPoints.length; i++) {
@@ -123,7 +122,8 @@
}
LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
- storageMetadataFile.getAbsolutePath(), 0, 0, storageRootDirPath);
+ storageMetadataFile.getAbsolutePath(), 0, storageMetadataFile.getAbsolutePath(), 0,
+ storageRootDirPath);
insert(rootLocalResource);
LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
}
@@ -131,13 +131,13 @@
}
@Override
- public LocalResource getResourceByName(String name) throws HyracksDataException {
- LocalResource resource = resourceCache.getIfPresent(name);
+ public LocalResource getResourceByPath(String path) throws HyracksDataException {
+ LocalResource resource = resourceCache.getIfPresent(path);
if (resource == null) {
- File resourceFile = getLocalResourceFileByName(name);
+ File resourceFile = getLocalResourceFileByName(path);
if (resourceFile.exists()) {
resource = readLocalResource(resourceFile);
- resourceCache.put(name, resource);
+ resourceCache.put(path, resource);
}
}
return resource;
@@ -145,13 +145,15 @@
@Override
public synchronized void insert(LocalResource resource) throws HyracksDataException {
- File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
+ File resourceFile = new File(getFileName(resource.getResourcePath(), resource.getResourceId()));
if (resourceFile.exists()) {
throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+ } else {
+ resourceFile.getParentFile().mkdirs();
}
if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
- resourceCache.put(resource.getResourceName(), resource);
+ resourceCache.put(resource.getResourcePath(), resource);
}
FileOutputStream fos = null;
@@ -182,18 +184,18 @@
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
- String filePath = getFileName(resource.getResourceName(), resource.getResourceId());
+ String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
createReplicationJob(ReplicationOperation.REPLICATE, filePath);
}
}
}
@Override
- public synchronized void deleteResourceByName(String name) throws HyracksDataException {
- File resourceFile = getLocalResourceFileByName(name);
+ public synchronized void deleteResourceByPath(String resourcePath) throws HyracksDataException {
+ File resourceFile = getLocalResourceFileByName(resourcePath);
if (resourceFile.exists()) {
resourceFile.delete();
- resourceCache.invalidate(name);
+ resourceCache.invalidate(resourcePath);
//if replication enabled, delete resource from remote replicas
if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
@@ -204,8 +206,8 @@
}
}
- private static File getLocalResourceFileByName(String resourceName) {
- return new File(resourceName + File.separator + METADATA_FILE_NAME);
+ private static File getLocalResourceFileByName(String resourcePath) {
+ return new File(resourcePath + File.separator + METADATA_FILE_NAME);
}
public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
@@ -220,25 +222,21 @@
}
//load all local resources.
- File[] dataverseFileList = storageRootDir.listFiles();
- if (dataverseFileList != null) {
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- resourcesMap.put(localResource.getResourceId(), localResource);
- }
- }
+ File[] partitions = storageRootDir.listFiles();
+ for (File partition : partitions) {
+ File[] dataverseFileList = partition.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ resourcesMap.put(localResource.getResourceId(), localResource);
}
}
}
@@ -263,27 +261,23 @@
continue;
}
- //traverse all local resources.
- File[] dataverseFileList = storageRootDir.listFiles();
- if (dataverseFileList != null) {
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- maxResourceId = Math.max(maxResourceId,
- localResource.getResourceId());
- }
- }
+ //load all local resources.
+ File[] partitions = storageRootDir.listFiles();
+ for (File partition : partitions) {
+ //traverse all local resources.
+ File[] dataverseFileList = partition.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
}
}
}
@@ -305,8 +299,7 @@
if (!baseDir.endsWith(System.getProperty("file.separator"))) {
baseDir += System.getProperty("file.separator");
}
- String fileName = new String(baseDir + METADATA_FILE_NAME);
- return fileName;
+ return new String(baseDir + METADATA_FILE_NAME);
}
}
@@ -376,6 +369,7 @@
/**
* Deletes physical files of all data verses.
+ *
* @param deleteStorageMetadata
* @throws IOException
*/
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index f602156..701e529 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -44,11 +44,13 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -70,11 +72,14 @@
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -295,10 +300,10 @@
//get datasetLifeCycleManager
IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
+ IIOManager ioManager = appRuntimeContext.getIOManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
.loadAndGetAllResources();
-
//set log reader to the lowWaterMarkLsn again.
logReader.initializeScan(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -346,14 +351,15 @@
//get index instance from IndexLifeCycleManager
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
- index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
+ String resourceAbsolutePath = localResource.getResourcePath();
+ index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
if (index == null) {
//#. create index instance and register to indexLifeCycleManager
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
- localResource.getResourceName(), localResource.getPartition());
- datasetLifecycleManager.register(localResource.getResourceName(), index);
- datasetLifecycleManager.open(localResource.getResourceName());
+ resourceAbsolutePath, localResource.getPartition());
+ datasetLifecycleManager.register(resourceAbsolutePath, index);
+ datasetLifecycleManager.open(resourceAbsolutePath);
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = index;
@@ -387,7 +393,7 @@
//close all indexes
Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
- datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
+ datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -501,6 +507,9 @@
//#. get indexLifeCycleManager
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+ IIOManager ioManager = appRuntimeContext.getIOManager();
+ SortedMap<Integer, ClusterPartition> clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext
+ .getAppContext()).getMetadataProperties().getClusterPartitions();
IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
@@ -552,12 +561,23 @@
//get index instance from IndexLifeCycleManager
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
- index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
+ //get the resource path relative to this node
+ int resourcePartition = localResource.getPartition();
+ //get partition io device id
+ //NOTE:
+ //currently we store all partition in the same IO device in all nodes. If this changes,
+ //this needs to be updated to find the IO device in which the partition is stored in this local node.
+ int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum();
+ String resourceAbsolutePath = ioManager
+ .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile()
+ .getAbsolutePath();
+ localResource.setResourcePath(resourceAbsolutePath);
+ index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
if (index == null) {
//#. create index instance and register to indexLifeCycleManager
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
- localResource.getResourceName(), localResource.getPartition());
+ resourceAbsolutePath, localResource.getPartition());
datasetLifecycleManager.register(localResource.getResourceName(), index);
datasetLifecycleManager.open(localResource.getResourceName());
@@ -595,7 +615,7 @@
//close all indexes
Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
- datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
+ datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
}
if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
index e84135e..4664fb1 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
@@ -30,13 +30,26 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.event.schema.yarnCluster.Cluster;
+import org.apache.asterix.event.schema.yarnCluster.MasterNode;
+import org.apache.asterix.event.schema.yarnCluster.Node;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -73,12 +86,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
-
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.MasterNode;
-import org.apache.asterix.event.schema.yarnCluster.Node;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -1041,14 +1048,12 @@
if (iodevice == null) {
iodevice = clusterDesc.getIodevices();
}
- String storageSuffix = local.getStore() == null ? clusterDesc.getStore() : local.getStore();
- String storagePath = iodevice + File.separator + storageSuffix;
vargs.add(ncJavaOpts);
vargs.add(NC_CLASSNAME);
vargs.add("-app-nc-main-class org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint");
vargs.add("-node-id " + local.getId());
vargs.add("-cc-host " + cC.getClusterIp());
- vargs.add("-iodevices " + storagePath);
+ vargs.add("-iodevices " + iodevice);
vargs.add("-cluster-net-ip-address " + local.getClusterIp());
vargs.add("-data-ip-address " + local.getClusterIp());
vargs.add("-result-ip-address " + local.getClusterIp());
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
index dc61506..f1123aa 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
@@ -38,6 +38,12 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.configuration.Coredump;
+import org.apache.asterix.common.configuration.Store;
+import org.apache.asterix.common.configuration.TransactionLogDir;
+import org.apache.asterix.event.schema.yarnCluster.Cluster;
+import org.apache.asterix.event.schema.yarnCluster.Node;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -76,13 +82,6 @@
import com.google.common.collect.ImmutableMap;
-import org.apache.asterix.common.configuration.AsterixConfiguration;
-import org.apache.asterix.common.configuration.Coredump;
-import org.apache.asterix.common.configuration.Store;
-import org.apache.asterix.common.configuration.TransactionLogDir;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.Node;
-
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class AsterixYARNClient {
@@ -113,13 +112,13 @@
}
}
- public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap
- .<String, AsterixYARNClient.Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL)
- .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL)
- .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER)
- .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE)
- .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP)
- .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build();
+ public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap.<String, AsterixYARNClient
+ .Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL).put(Mode.START.alias, Mode.START)
+ .put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL).put(Mode.DESTROY.alias, Mode.DESTROY)
+ .put(Mode.ALTER.alias, Mode.ALTER).put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL)
+ .put(Mode.DESCRIBE.alias, Mode.DESCRIBE).put(Mode.BACKUP.alias, Mode.BACKUP)
+ .put(Mode.LSBACKUP.alias, Mode.LSBACKUP).put(Mode.RMBACKUP.alias, Mode.RMBACKUP)
+ .put(Mode.RESTORE.alias, Mode.RESTORE).build();
private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class);
public static final String CONF_DIR_REL = ".asterix" + File.separator;
private static final String instanceLock = "instance";
@@ -223,8 +222,8 @@
}
break;
case KILL:
- if (client.isRunning() &&
- Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
+ if (client.isRunning() && Utils.confirmAction(
+ "Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
try {
AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient);
} catch (ApplicationNotFoundException e) {
@@ -232,8 +231,7 @@
System.out.println("Asterix instance by that name already exited or was never started");
client.deleteLockFile();
}
- }
- else if(!client.isRunning()){
+ } else if (!client.isRunning()) {
System.out.println("Asterix instance by that name already exited or was never started");
client.deleteLockFile();
}
@@ -254,8 +252,8 @@
break;
case DESTROY:
try {
- if (client.force
- || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
+ if (client.force || Utils.confirmAction(
+ "Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
app = client.makeApplicationContext();
res = client.deployConfig();
res.addAll(client.distributeBinaries());
@@ -289,7 +287,8 @@
}
break;
default:
- LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
+ LOG.fatal(
+ "Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
client.printUsage();
System.exit(-1);
}
@@ -366,8 +365,8 @@
"Amount of memory in MB to be requested to run the application master"));
opts.addOption(new Option("log_properties", true, "log4j.properties file"));
opts.addOption(new Option("n", "name", true, "Asterix instance name (required)"));
- opts.addOption(new Option("zip", "asterixZip", true,
- "zip file with AsterixDB inside- if in non-default location"));
+ opts.addOption(
+ new Option("zip", "asterixZip", true, "zip file with AsterixDB inside- if in non-default location"));
opts.addOption(new Option("bc", "baseConfig", true,
"base Asterix parameters configuration file if not in default position"));
opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)"));
@@ -381,13 +380,13 @@
"Backup timestamp for arguments requiring a specific backup (rm, restore)"));
opts.addOption(new Option("v", "debug", false, "Dump out debug information"));
opts.addOption(new Option("help", false, "Print usage"));
- opts.addOption(new Option("f", "force", false,
- "Execute this command as fully as possible, disregarding any caution"));
+ opts.addOption(
+ new Option("f", "force", false, "Execute this command as fully as possible, disregarding any caution"));
return opts;
}
/**
- */
+ */
public AsterixYARNClient() throws Exception {
this(new YarnConfiguration());
}
@@ -589,8 +588,9 @@
try {
ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId);
YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState();
- if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED)
- && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+ if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED
+ || prevStatus == YarnApplicationState.FINISHED) && mode != Mode.DESTROY && mode != Mode.BACKUP
+ && mode != Mode.RESTORE) {
throw new IllegalStateException("Instance is already running in: " + lockAppId);
} else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
//stale lock file
@@ -598,7 +598,8 @@
deleteLockFile();
}
} catch (YarnException e) {
- LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
+ LOG.warn(
+ "Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
deleteLockFile();
}
}
@@ -690,6 +691,7 @@
/**
* Upload External libraries and functions to HDFS for an instance to use when started
+ *
* @throws IllegalStateException
* @throws IOException
*/
@@ -700,8 +702,8 @@
throw new IllegalStateException("No instance by name " + instanceName + " found.");
}
if (isRunning()) {
- throw new IllegalStateException("Instance " + instanceName
- + " is running. Please stop it before installing any libraries.");
+ throw new IllegalStateException(
+ "Instance " + instanceName + " is running. Please stop it before installing any libraries.");
}
String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse
+ Path.SEPARATOR;
@@ -714,6 +716,7 @@
/**
* Finds the minimal classes and JARs needed to start the AM only.
+ *
* @return Resources the AM needs to start on the initial container.
* @throws IllegalStateException
* @throws IOException
@@ -771,7 +774,9 @@
/**
* Uploads a AsterixDB cluster configuration to HDFS for the AM to use.
- * @param overwrite Overwrite existing configurations by the same name.
+ *
+ * @param overwrite
+ * Overwrite existing configurations by the same name.
* @throws IllegalStateException
* @throws IOException
*/
@@ -791,6 +796,7 @@
/**
* Uploads binary resources to HDFS for use by the AM
+ *
* @return
* @throws IOException
* @throws YarnException
@@ -862,7 +868,7 @@
/**
* Submits the request to start the AsterixApplicationMaster to the YARN ResourceManager.
- *
+ *
* @param app
* The application attempt handle.
* @param resources
@@ -954,14 +960,11 @@
}
if (mode == Mode.DESTROY) {
vargs.add("-obliterate");
- }
- else if (mode == Mode.BACKUP) {
+ } else if (mode == Mode.BACKUP) {
vargs.add("-backup");
- }
- else if (mode == Mode.RESTORE) {
+ } else if (mode == Mode.RESTORE) {
vargs.add("-restore " + snapName);
- }
- else if( mode == Mode.INSTALL){
+ } else if (mode == Mode.INSTALL) {
vargs.add("-initial ");
}
if (refresh) {
@@ -1036,8 +1039,11 @@
/**
* Asks YARN to kill a given application by appId
- * @param appId The application to kill.
- * @param yarnClient The YARN client object that is connected to the RM.
+ *
+ * @param appId
+ * The application to kill.
+ * @param yarnClient
+ * The YARN client object that is connected to the RM.
* @throws YarnException
* @throws IOException
*/
@@ -1064,11 +1070,11 @@
/**
* Tries to stop a running AsterixDB instance gracefully.
+ *
* @throws IOException
* @throws YarnException
*/
- private void stopInstanceIfRunning()
- throws IOException, YarnException {
+ private void stopInstanceIfRunning() throws IOException, YarnException {
FileSystem fs = FileSystem.get(conf);
String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
@@ -1086,20 +1092,24 @@
/**
* Start a YARN job to delete local AsterixDB resources of an extant instance
- * @param app The Client connection
- * @param resources AM resources
+ *
+ * @param app
+ * The Client connection
+ * @param resources
+ * AM resources
* @throws IOException
* @throws YarnException
*/
- private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
- YarnException {
+ private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+ throws IOException, YarnException {
FileSystem fs = FileSystem.get(conf);
//if the instance is up, fix that
stopInstanceIfRunning();
//now try deleting all of the on-disk artifacts on the cluster
ApplicationId deleter = deployAM(app, resources, Mode.DESTROY);
- boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort);
+ boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start",
+ ccRestPort);
if (!delete_start) {
if (force) {
fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
@@ -1121,19 +1131,20 @@
/**
* Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS
+ *
* @param app
* @param resources
* @throws IOException
* @throws YarnException
*/
- private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
- YarnException {
+ private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+ throws IOException, YarnException {
stopInstanceIfRunning();
ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP);
boolean backupStart;
- backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString()
- + "to start", ccRestPort);
+ backupStart = Utils.waitForApplication(backerUpper, yarnClient,
+ "Waiting for backup " + backerUpper.toString() + "to start", ccRestPort);
if (!backupStart) {
LOG.fatal("Backup failed to start");
throw new YarnException();
@@ -1149,17 +1160,19 @@
/**
* Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance
+ *
* @param app
* @param resources
* @throws IOException
* @throws YarnException
*/
- private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
- YarnException {
+ private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+ throws IOException, YarnException {
stopInstanceIfRunning();
ApplicationId restorer = deployAM(app, resources, Mode.RESTORE);
- boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort);
+ boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start",
+ ccRestPort);
if (!restoreStart) {
LOG.fatal("Restore failed to start");
throw new YarnException();
@@ -1174,7 +1187,7 @@
/**
* Stops the instance and remove the lockfile to allow a restart.
- *
+ *
* @throws IOException
* @throws JAXBException
* @throws YarnException
@@ -1194,7 +1207,7 @@
}
try {
String ccIp = Utils.getCCHostname(instanceName, conf);
- Utils.sendShutdownCall(ccIp,ccRestPort);
+ Utils.sendShutdownCall(ccIp, ccRestPort);
} catch (IOException e) {
LOG.error("Error while trying to issue safe shutdown:", e);
}
@@ -1207,7 +1220,7 @@
AsterixYARNClient.killApplication(appId, yarnClient);
completed = true;
} catch (YarnException e1) {
- LOG.fatal("Could not stop nor kill instance gracefully.",e1);
+ LOG.fatal("Could not stop nor kill instance gracefully.", e1);
return;
}
}
@@ -1278,11 +1291,12 @@
/**
* Locate the Asterix parameters file.
- * @return
+ *
+ * @return
* @throws FileNotFoundException
* @throws IOException
*/
- private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{
+ private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException {
AsterixConfiguration configuration;
String configPathBase = MERGED_PARAMETERS_PATH;
if (baseConfig != null) {
@@ -1299,14 +1313,14 @@
/**
*
*/
- private void readConfigParams(AsterixConfiguration configuration){
+ private void readConfigParams(AsterixConfiguration configuration) {
//this is the "base" config that is inside the zip, we start here
for (org.apache.asterix.common.configuration.Property property : configuration.getProperty()) {
if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) {
ccJavaOpts = property.getValue();
} else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) {
ncJavaOpts = property.getValue();
- } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){
+ } else if (property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)) {
ccRestPort = Integer.parseInt(property.getValue());
}
@@ -1315,6 +1329,7 @@
/**
* Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters
+ *
* @param cluster
* @throws FileNotFoundException
* @throws IOException
@@ -1332,14 +1347,20 @@
configuration.setVersion(version);
configuration.setInstanceName(asterixInstanceName);
- String storeDir = null;
List<Store> stores = new ArrayList<Store>();
+ String storeDir = cluster.getStore().trim();
for (Node node : cluster.getNode()) {
- storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
- stores.add(new Store(node.getId(), storeDir));
+ String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+ String[] nodeIdDevice = iodevices.split(",");
+ StringBuilder nodeStores = new StringBuilder();
+ for (int i = 0; i < nodeIdDevice.length; i++) {
+ nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+ }
+ //remove last comma
+ nodeStores.deleteCharAt(nodeStores.length() - 1);
+ stores.add(new Store(node.getId(), nodeStores.toString()));
}
configuration.setStore(stores);
-
List<Coredump> coredump = new ArrayList<Coredump>();
String coredumpDir = null;
List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
@@ -1354,7 +1375,6 @@
+ File.separator));
}
configuration.setMetadataNode(metadataNodeId);
-
configuration.setCoredump(coredump);
configuration.setTransactionLogDir(txnLogDirs);
FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH);