Merge branch 'master' into westmann/locks
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java
index 94c074f..1d6b1c3 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java
@@ -16,6 +16,8 @@
import java.util.Map;
import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.expression.DatasetDecl;
@@ -30,6 +32,7 @@
import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -39,10 +42,38 @@
*/
public abstract class AbstractAqlTranslator {
+ protected static final Logger LOGGER = Logger.getLogger(AbstractAqlTranslator.class.getName());
+
protected static final Map<String, BuiltinType> builtinTypeMap = AsterixBuiltinTypeMap.getBuiltinTypes();
public void validateOperation(Dataverse defaultDataverse, Statement stmt) throws AsterixException {
+ if (!AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.ACTIVE)) {
+ int maxWaitCycles = AsterixAppContextInfo.getInstance().getExternalProperties().getMaxWaitClusterActive();
+ int waitCycleCount = 0;
+ try {
+ while (!AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.ACTIVE)
+ && waitCycleCount < maxWaitCycles) {
+ Thread.sleep(1000);
+ waitCycleCount++;
+ }
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Thread interrupted while waiting for cluster to be "
+ + AsterixClusterProperties.State.ACTIVE);
+ }
+ }
+ if (!AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.ACTIVE)) {
+ throw new AsterixException(" Asterix Cluster is in " + AsterixClusterProperties.State.UNUSABLE
+ + " state." + "\n One or more Node Controllers have left or haven't joined yet.\n");
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cluster is now " + AsterixClusterProperties.State.ACTIVE);
+ }
+ }
+ }
+
+
if (AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.UNUSABLE)) {
throw new AsterixException(" Asterix Cluster is in " + AsterixClusterProperties.State.UNUSABLE + " state."
+ "\n One or more Node Controllers have left.\n");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 96d0617..3502abb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -129,16 +129,26 @@
isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
if (isMetadataNode) {
- registerRemoteMetadataNode(proxy);
-
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Bootstrapping metadata");
}
- MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
- MetadataManager.INSTANCE.init();
+ MetadataNode.INSTANCE.initialize(runtimeContext);
+
+ // This is a special case, we just give the metadataNode directly.
+ // This way we can delay the registration of the metadataNode until
+ // it is completely initialized.
+ MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
systemState == SystemState.NEW_UNIVERSE);
MetadataBootstrap.startDDLRecovery();
+
+ IMetadataNode stub = null;
+ stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, 0);
+ proxy.setMetadataNode(stub);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Metadata node bound");
+ }
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -166,17 +176,6 @@
// reclaim storage for orphaned index artifacts in NCs.
}
- public void registerRemoteMetadataNode(IAsterixStateProxy proxy) throws RemoteException {
- IMetadataNode stub = null;
- MetadataNode.INSTANCE.initialize(runtimeContext);
- stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, 0);
- proxy.setMetadataNode(stub);
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Metadata node bound");
- }
- }
-
/**
* Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
*/
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 7a91205..d798cd5 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -25,6 +25,15 @@
<ncId>nc2</ncId>
<txnLogDirPath>target/txnLogDir/nc2</txnLogDirPath>
</transactionLogDir>
+
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+ before a submitted query/statement can be executed. (Default = 60 seconds)
+ </description>
+ </property>
+
<property>
<name>log.level</name>
<value>WARNING</value>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
index e975f60..674263f 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
@@ -27,6 +27,9 @@
private static final String EXTERNAL_APISERVER_KEY = "api.port";
private static int EXTERNAL_APISERVER_DEFAULT = 19002;
+ private static final String EXTERNAL_MAX_WAIT_FOR_ACTIVE_CLUSTER = "max.wait.active.cluster";
+ private static int EXTERNAL_MAX_WAIT_FOR_ACTIVE_CLUSTER_DEFAULT = 60;
+
public AsterixExternalProperties(AsterixPropertiesAccessor accessor) {
super(accessor);
}
@@ -45,4 +48,9 @@
return accessor.getProperty(EXTERNAL_LOGLEVEL_KEY, EXTERNAL_LOGLEVEL_DEFAULT,
PropertyInterpreters.getLevelPropertyInterpreter());
}
+
+ public int getMaxWaitClusterActive() {
+ return accessor.getProperty(EXTERNAL_MAX_WAIT_FOR_ACTIVE_CLUSTER, EXTERNAL_MAX_WAIT_FOR_ACTIVE_CLUSTER_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
}
diff --git a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
index 3fec2c8..e47dfdb 100644
--- a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
@@ -422,26 +422,7 @@
break;
case "txnqar": //qar represents query after recovery
try {
- ////////////// <begin of temporary fix> ////////////////////////////
- //TODO
- //Temporary fix in order not to block the build test(mvn verify)
- //A proper fix should not have the while loop here.
- int maxRetryCount = 12;
- int tryCount = 0;
- InputStream resultStream = null;
- long sleepTime = 5;
-
- do {
- //wait until NC starts
- sleepTime *= 2;
- Thread.sleep(sleepTime);
- if (++tryCount > maxRetryCount) {
- LOGGER.info("Metadata node is not running - this test will fail.");
- break;
- }
- resultStream = executeQuery(statement);
- } while (resultStream.toString().contains("Connection refused to host"));
- ////////////// <end of temporary fix> //////////////////////////////
+ InputStream resultStream = executeQuery(statement);
qarFile = new File(actualPath + File.separator
+ testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index cbee8f8..6f96191 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -27,6 +27,14 @@
</description>
</property>
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+ before a submitted query/statement can be executed. (Default = 60 seconds)
+ </description>
+ </property>
+
<property>
<name>storage.buffercache.pagesize</name>
<value>131072</value>
diff --git a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java
index 89cbec8..e8fd61f 100644
--- a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java
+++ b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/transaction/RecoveryIT.java
@@ -69,6 +69,7 @@
}
})[0];
managixHomePath = new File(installerTargetPath, managixHomeDirName).getAbsolutePath();
+ LOGGER.info("MANAGIX_HOME=" + managixHomePath);
String fileListPath = asterixInstallerPath.getAbsolutePath() + File.separator + "src" + File.separator + "test"
+ File.separator + "resources" + File.separator + "transactionts" + File.separator + "data"
@@ -76,6 +77,7 @@
String srcBasePath = asterixAppPath.getAbsolutePath();
String destBasePath = managixHomePath + File.separator + "clusters" + File.separator + "local" + File.separator
+ "working_dir";
+ LOGGER.info("working dir: " + destBasePath);
prepareDataFiles(fileListPath, srcBasePath, destBasePath);
pb = new ProcessBuilder();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index a1afd89..1f65f7f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -71,6 +71,9 @@
* with transaction ids of regular jobs or other metadata transactions.
*/
public class MetadataManager implements IMetadataManager {
+ private static final int INITIAL_SLEEP_TIME = 64;
+ private static final int RETRY_MULTIPLIER = 4;
+ private static final int MAX_RETRY_COUNT = 6;
// Set in init().
public static MetadataManager INSTANCE;
private final MetadataCache cache = new MetadataCache();
@@ -89,14 +92,37 @@
this.metadataLatch = new ReentrantReadWriteLock(true);
}
+ public MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) {
+ if (metadataNode == null) {
+ throw new Error("Null metadataNode given to MetadataManager.");
+ }
+ this.proxy = proxy;
+ this.metadataProperties = null;
+ this.metadataNode = metadataNode;
+ this.metadataLatch = new ReentrantReadWriteLock(true);
+ }
+
@Override
- public void init() throws RemoteException {
+ public void init() throws RemoteException, MetadataException {
// Could be synchronized on any object. Arbitrarily chose proxy.
synchronized (proxy) {
if (metadataNode != null) {
return;
}
- metadataNode = proxy.getMetadataNode();
+ try {
+ int retry = 0;
+ int sleep = INITIAL_SLEEP_TIME;
+ while (retry++ < MAX_RETRY_COUNT) {
+ metadataNode = proxy.getMetadataNode();
+ if (metadataNode != null) {
+ break;
+ }
+ Thread.sleep(sleep);
+ sleep *= RETRY_MULTIPLIER;
+ }
+ } catch (InterruptedException e) {
+ throw new MetadataException(e);
+ }
if (metadataNode == null) {
throw new Error("Failed to get the MetadataNode.\n" + "The MetadataNode was configured to run on NC: "
+ metadataProperties.getMetadataNodeName());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 9a35fb0..a2492fa 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -1005,9 +1005,9 @@
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
- ITupleReference ref = rangeCursor.getTuple();
- Dataset ds = valueExtractor.getValue(jobId, rangeCursor.getTuple());
- datasetId = ((Dataset) valueExtractor.getValue(jobId, rangeCursor.getTuple())).getDatasetId();
+ final ITupleReference ref = rangeCursor.getTuple();
+ final Dataset ds = valueExtractor.getValue(jobId, ref);
+ datasetId = ds.getDatasetId();
if (mostRecentDatasetId < datasetId) {
mostRecentDatasetId = datasetId;
}