Refactor Messaging
In this refactoring, each message implementation includes
a handle method. This avoids bloating of message brokers
and enable better extensibility for messaging.
Change-Id: I7c918bf504058c98ecf89f5b019503278e9aa01f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1128
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
similarity index 94%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
index fe15ce8..06e9ad1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.app.external;
+package org.apache.asterix.active;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -24,9 +24,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index a6e1788..50fa257 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -20,7 +20,12 @@
import java.io.Serializable;
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
public class ActiveManagerMessage extends AbstractApplicationMessage {
public static final byte STOP_ACTIVITY = 0x00;
@@ -36,11 +41,6 @@
this.payload = payload;
}
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.ACTIVE_MANAGER_MESSAGE;
- }
-
public Serializable getPayload() {
return payload;
}
@@ -52,4 +52,17 @@
public String getSrc() {
return src;
}
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAsterixAppRuntimeContext appContext =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ ((ActiveManager) appContext.getActiveManager()).submit(this);
+ }
+
+ @Override
+ public String type() {
+ return "ACTIVE_MANAGER_MESSAGE";
+ }
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index f5bdf39..02affc4 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -20,9 +20,12 @@
import java.io.Serializable;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.service.IControllerService;
public class ActivePartitionMessage extends AbstractApplicationMessage {
@@ -45,11 +48,6 @@
this.payload = payload;
}
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.ACTIVE_ENTITY_TO_CC_MESSAGE;
- }
-
public ActiveRuntimeId getActiveRuntimeId() {
return activeRuntimeId;
}
@@ -65,4 +63,14 @@
public byte getEvent() {
return event;
}
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ ActiveLifecycleListener.INSTANCE.receive(this);
+ }
+
+ @Override
+ public String type() {
+ return "ACTIVE_ENTITY_TO_CC_MESSAGE";
+ }
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 4e1d9b0..ec2b41e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -39,13 +40,12 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -108,7 +108,7 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
AbstractUnnestMapOperator unnestMapOp = (AbstractUnnestMapOperator) op;
ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
@@ -258,7 +258,7 @@
.getBinaryTokenizerFactory(searchModifierType, searchKeyType, secondaryIndex);
IIndexDataflowHelperFactory dataflowHelperFactory;
- AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
boolean temp = dataset.getDatasetDetails().isTemp();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index bf7b975..10b601b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -37,8 +37,8 @@
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.utils.Pair;
/**
@@ -55,7 +55,7 @@
if (!(AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.ACTIVE)
&& AsterixClusterProperties.INSTANCE.isGlobalRecoveryCompleted())) {
- int maxWaitCycles = AsterixAppContextInfo.getInstance().getExternalProperties().getMaxWaitClusterActive();
+ int maxWaitCycles = AsterixAppContextInfo.INSTANCE.getExternalProperties().getMaxWaitClusterActive();
int waitCycleCount = 0;
try {
while (!AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.ACTIVE)
@@ -84,7 +84,7 @@
}
if (!AsterixClusterProperties.INSTANCE.isGlobalRecoveryCompleted()) {
- int maxWaitCycles = AsterixAppContextInfo.getInstance().getExternalProperties().getMaxWaitClusterActive();
+ int maxWaitCycles = AsterixAppContextInfo.INSTANCE.getExternalProperties().getMaxWaitClusterActive();
int waitCycleCount = 0;
try {
while (!AsterixClusterProperties.INSTANCE.isGlobalRecoveryCompleted()
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index ed9c1e6..56a5a57 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -93,8 +93,8 @@
import org.apache.asterix.om.functions.AsterixFunctionInfo;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
@@ -656,7 +656,7 @@
String outputDir = System.getProperty("java.io.tmpDir");
String filePath = outputDir + System.getProperty("file.separator") + OUTPUT_FILE_PREFIX
+ outputFileID.incrementAndGet();
- AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
+ AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties();
return new FileSplit(metadataProperties.getMetadataNodeName(), new FileReference(new File(filePath)));
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 5728947..76d0245 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -51,9 +51,9 @@
import org.apache.asterix.lang.common.statement.FunctionDecl;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.optimizer.base.RuleCollections;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -239,13 +239,13 @@
}
//print the plot for the logical plan
- AsterixExternalProperties xProps = AsterixAppContextInfo.getInstance().getExternalProperties();
+ AsterixExternalProperties xProps = AsterixAppContextInfo.INSTANCE.getExternalProperties();
Boolean plot = xProps.getIsPlottingEnabled();
if (plot) {
PlanPlotter.printLogicalPlan(plan);
}
- AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+ AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.INSTANCE.getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
int sortFrameLimit = (int) (compilerProperties.getSortMemorySize() / frameSize);
int groupFrameLimit = (int) (compilerProperties.getGroupMemorySize() / frameSize);
@@ -340,7 +340,7 @@
JobEventListenerFactory jobEventListenerFactory =
new JobEventListenerFactory(asterixJobId, queryMetadataProvider.isWriteTransaction());
- JobSpecification spec = compiler.createJob(AsterixAppContextInfo.getInstance(), jobEventListenerFactory);
+ JobSpecification spec = compiler.createJob(AsterixAppContextInfo.INSTANCE, jobEventListenerFactory);
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
printPlanPrefix(conf, "Hyracks job");
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
index eb23902..c7cf1ea 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
@@ -26,7 +26,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.asterix.app.result.ResultUtil;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.json.JSONException;
import org.json.JSONObject;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java
index 92c8e8a..524c87f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java
@@ -18,10 +18,6 @@
*/
package org.apache.asterix.api.http.servlet;
-import org.apache.asterix.common.config.AsterixExternalProperties;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.commons.io.IOUtils;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -34,10 +30,11 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.asterix.common.config.AsterixExternalProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
import org.codehaus.jettison.json.JSONObject;
public class QueryWebInterfaceServlet extends HttpServlet {
@@ -112,7 +109,7 @@
public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.setCharacterEncoding("utf-8");
response.setContentType("application/json");
- AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+ AsterixExternalProperties externalProperties = AsterixAppContextInfo.INSTANCE.getExternalProperties();
JSONObject obj = new JSONObject();
try {
PrintWriter out = response.getWriter();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
index dd2f799..c786dc7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.api.http.servlet;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;
@@ -30,12 +32,10 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.json.JSONObject;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-
public class ShutdownAPIServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@@ -60,7 +60,7 @@
try {
jsonObject.put("status", "SHUTTING_DOWN");
jsonObject.put("date", new Date());
- jsonObject.put("cluster" , AsterixClusterProperties.INSTANCE.getClusterStateDescription());
+ jsonObject.put("cluster", AsterixClusterProperties.INSTANCE.getClusterStateDescription());
final PrintWriter writer = response.getWriter();
writer.print(jsonObject.toString(4));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
index a426d50..20ecaa5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
@@ -27,7 +27,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.json.JSONObject;
import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/AsterixResourceIdManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/AsterixResourceIdManager.java
new file mode 100644
index 0000000..a773bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/AsterixResourceIdManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.cc;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+
+public class AsterixResourceIdManager implements IAsterixResourceIdManager {
+
+ private final AtomicLong globalResourceId = new AtomicLong();
+ private volatile Set<String> reportedNodes = new HashSet<>();
+ private volatile boolean allReported = false;
+
+ @Override
+ public long createResourceId() {
+ if (!allReported) {
+ synchronized (this) {
+ if (!allReported) {
+ if (reportedNodes.size() < AsterixClusterProperties.getNumberOfNodes()) {
+ return -1;
+ } else {
+ reportedNodes = null;
+ allReported = true;
+ }
+ }
+ }
+ }
+ return globalResourceId.incrementAndGet();
+ }
+
+ @Override
+ public synchronized boolean reported(String nodeId) {
+ return allReported || reportedNodes.contains(nodeId);
+ }
+
+ @Override
+ public synchronized void report(String nodeId, long maxResourceId) {
+ if (!allReported) {
+ globalResourceId.set(Math.max(maxResourceId, globalResourceId.get()));
+ reportedNodes.add(nodeId);
+ if (reportedNodes.size() == AsterixClusterProperties.getNumberOfNodes()) {
+ reportedNodes = null;
+ allReported = true;
+ }
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
index 6922379..c8a9566 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
@@ -38,6 +38,7 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
@@ -68,12 +69,11 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
import org.apache.hadoop.conf.Configuration;
@@ -212,7 +212,7 @@
ArrayList<ExternalFile> externalFilesSnapshot, AqlMetadataProvider metadataProvider, boolean createIndex)
throws MetadataException, AlgebricksException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+ IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
metadataProvider.getMetadataTxnContext());
@@ -226,8 +226,9 @@
ILocalResourceMetadata localResourceMetadata = new ExternalBTreeLocalResourceMetadata(
filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES,
new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties);
- PersistentLocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
- localResourceMetadata, LocalResource.ExternalBTreeResource);
+ PersistentLocalResourceFactoryProvider localResourceFactoryProvider =
+ new PersistentLocalResourceFactoryProvider(
+ localResourceMetadata, LocalResource.ExternalBTreeResource);
ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
mergePolicyFactory, mergePolicyFactoryProperties,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
@@ -257,9 +258,10 @@
* @throws AsterixException
* @throws Exception
*/
- private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator(
- AqlMetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset,
- List<ExternalFile> files, RecordDescriptor indexerDesc) throws AsterixException {
+ private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>
+ getExternalDataIndexingOperator(
+ AqlMetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset,
+ List<ExternalFile> files, RecordDescriptor indexerDesc) throws AsterixException {
ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
Map<String, String> configuration = externalDatasetDetails.getProperties();
IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
@@ -406,7 +408,7 @@
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true);
- AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
metadataProvider.getMetadataTxnContext());
IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
@@ -480,7 +482,7 @@
public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
throws AlgebricksException, AsterixException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+ IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
metadataProvider.getMetadataTxnContext());
@@ -571,8 +573,10 @@
boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numNestedSecondaryKeyFields = numDimensions * 2;
- IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ IPrimitiveValueProviderFactory[] valueProviderFactories =
+ new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+ IBinaryComparatorFactory[] secondaryComparatorFactories =
+ new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ numNestedSecondaryKeyFields];
@@ -613,7 +617,7 @@
public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
throws AlgebricksException, AsterixException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+ IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
metadataProvider.getMetadataTxnContext());
@@ -671,7 +675,7 @@
public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
throws AlgebricksException, AsterixException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+ IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
metadataProvider.getMetadataTxnContext());
@@ -728,7 +732,7 @@
public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, AqlMetadataProvider metadataProvider)
throws MetadataException, AlgebricksException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+ IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
metadataProvider.getMetadataTxnContext());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index aab1aa3..110ace6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -42,7 +42,7 @@
import org.apache.asterix.file.JobSpecificationUtils;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index b815602..7f31d20 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -36,7 +36,7 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.DataverseDecl;
import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -92,7 +92,7 @@
statements.add(dataverseDecl);
statements.add(subscribeStmt);
IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider);
- translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+ translator.compileAndExecute(AsterixAppContextInfo.INSTANCE.getHcc(), null,
QueryTranslator.ResultDelivery.SYNC);
if (LOGGER.isEnabledFor(Level.INFO)) {
LOGGER.info("Submitted connection requests for execution: " + request);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 38d7da3..077600a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -51,6 +51,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
@@ -59,20 +60,19 @@
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.replication.management.ReplicationChannel;
import org.apache.asterix.replication.management.ReplicationManager;
import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.GlobalResourceIdFactoryProvider;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.hyracks.api.application.IApplicationConfig;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
index 99235be..e151963 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.app.result;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
@@ -39,7 +39,7 @@
// Number of parallel result reader buffers
public static final int NUM_READERS = 1;
- public static final int FRAME_SIZE = AsterixAppContextInfo.getInstance().getCompilerProperties().getFrameSize();
+ public static final int FRAME_SIZE = AsterixAppContextInfo.INSTANCE.getCompilerProperties().getFrameSize();
public ResultReader(IHyracksDataset hdc) {
hyracksDataset = hdc;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 727a1f9..b430807 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -154,9 +154,9 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
@@ -740,9 +740,9 @@
} else {
nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
}
- List<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+ List<String> nodeNames = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames();
List<String> nodeNamesClone = new ArrayList<>(nodeNames);
- String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+ String metadataNodeName = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
List<String> selectedNodes = new ArrayList<>();
selectedNodes.add(metadataNodeName);
numChosen++;
@@ -2901,7 +2901,7 @@
if (pregelixHome == null) {
// Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
// pregelixHome can never be null.
- pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
+ pregelixHome = AsterixAppContextInfo.INSTANCE.getCompilerProperties().getPregelixHome();
}
// Constructs the pregelix command line.
@@ -3024,7 +3024,7 @@
protected List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
String fromDatasetName, String toDataverseName, String toDatasetName) {
// Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
- AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+ AsterixExternalProperties externalProperties = AsterixAppContextInfo.INSTANCE.getExternalProperties();
AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
StringBuilder asterixdbParameterBuilder = new StringBuilder();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index 46b9c35..8798817 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -38,11 +39,10 @@
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -114,7 +114,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName,
temp);
- AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
metadataProvider.getMetadataTxnContext());
@@ -180,7 +180,7 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
metadata.getMetadataTxnContext());
- AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first,
@@ -237,7 +237,7 @@
int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
- AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
metadata.getMetadataTxnContext());
LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index 6ed8db9..3696a36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -26,15 +26,15 @@
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
@@ -103,7 +103,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
- AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
index d6bfbaa..2a4ab04 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
@@ -19,12 +19,12 @@
package org.apache.asterix.file;
import org.apache.asterix.common.config.AsterixCompilerProperties;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.hyracks.api.job.JobSpecification;
public class JobSpecificationUtils {
public static JobSpecification createJobSpecification() {
- AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+ AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.INSTANCE.getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
JobSpecification spec = new JobSpecification(frameSize);
return spec;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index 9e2e0b0..d257dcd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -41,7 +42,6 @@
import org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 8ebe246..4eced73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.context.TransactionSubsystemProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.external.indexing.ExternalFile;
@@ -51,15 +52,14 @@
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -158,7 +158,7 @@
boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType metaType,
List<Integer> keySourceIndicators, ARecordType enforcedType) throws AsterixException, AlgebricksException {
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+ IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
SecondaryIndexOperationsHelper indexOperationsHelper = null;
switch (indexType) {
case BTREE: {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
index c2d2f7c..2aa6d97 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.om.types.IAType;
@@ -35,7 +36,6 @@
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
index cc9675d..d9a7e4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
@@ -46,7 +47,6 @@
import org.apache.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 6a2456d..5b36782 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.hyracks.bootstrap;
+import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
@@ -25,6 +28,7 @@
import javax.servlet.Servlet;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.api.http.servlet.APIServlet;
import org.apache.asterix.api.http.servlet.AQLAPIServlet;
import org.apache.asterix.api.http.servlet.ClusterAPIServlet;
@@ -39,8 +43,8 @@
import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
import org.apache.asterix.api.http.servlet.VersionAPIServlet;
+import org.apache.asterix.app.cc.AsterixResourceIdManager;
import org.apache.asterix.app.cc.CompilerExtensionManager;
-import org.apache.asterix.app.external.ActiveLifecycleListener;
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -56,8 +60,8 @@
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
import org.apache.asterix.metadata.cluster.ClusterManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.HyracksConnection;
@@ -69,9 +73,6 @@
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-
public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
@@ -94,11 +95,12 @@
appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection());
ILibraryManager libraryManager = new ExternalLibraryManager();
+ AsterixResourceIdManager resourceIdManager = new AsterixResourceIdManager();
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.instance(),
- libraryManager);
+ libraryManager, resourceIdManager);
ccExtensionManager = new CompilerExtensionManager(getExtensions());
- AsterixAppContextInfo.getInstance().setExtensionManager(ccExtensionManager);
+ AsterixAppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
if (System.getProperty("java.rmi.server.hostname") == null) {
System.setProperty("java.rmi.server.hostname",
@@ -108,10 +110,10 @@
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject());
appCtx.setDistributedState(proxy);
- AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
+ AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties();
MetadataManager.instantiate(new MetadataManager(proxy, metadataProperties));
- AsterixAppContextInfo.getInstance().getCCApplicationContext()
+ AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
.addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
servers = configureServers();
@@ -127,11 +129,11 @@
}
protected List<AsterixExtension> getExtensions() {
- return AsterixAppContextInfo.getInstance().getExtensionProperties().getExtensions();
+ return AsterixAppContextInfo.INSTANCE.getExtensionProperties().getExtensions();
}
protected List<Server> configureServers() throws Exception {
- AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+ AsterixExternalProperties externalProperties = AsterixAppContextInfo.INSTANCE.getExternalProperties();
List<Server> serverList = new ArrayList<>();
serverList.add(setupWebServer(externalProperties));
@@ -190,7 +192,7 @@
IHyracksClientConnection hcc = getNewHyracksClientConnection();
context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
- context.setAttribute(ASTERIX_BUILD_PROP_ATTR, AsterixAppContextInfo.getInstance());
+ context.setAttribute(ASTERIX_BUILD_PROP_ATTR, AsterixAppContextInfo.INSTANCE);
jsonAPIServer.setHandler(context);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 26f4bc8..5fc91a4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -39,7 +39,7 @@
import org.apache.asterix.metadata.cluster.ClusterManager;
import org.apache.asterix.metadata.cluster.RemoveNodeWork;
import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
public class ClusterLifecycleListener implements IClusterLifecycleListener {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 529d38a..0e3bb3b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -30,7 +30,7 @@
import org.apache.asterix.metadata.cluster.AddNodeWork;
import org.apache.asterix.metadata.cluster.ClusterManager;
import org.apache.asterix.metadata.cluster.RemoveNodeWork;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
public class ClusterWorkExecutor implements Runnable {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index bbd400b..7dde284 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -41,7 +41,7 @@
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index a8bc48f..c11b875 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.event.schema.cluster.Cluster;
@@ -44,8 +45,8 @@
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
@@ -210,8 +211,7 @@
@Override
public void notifyStartupComplete() throws Exception {
//Send max resource id on this NC to the CC
- ((NCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
-
+ ReportMaxResourceIdMessage.send((NodeControllerService) ncApplicationContext.getControllerService());
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 27a5365..1345aa7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,27 +18,13 @@
*/
package org.apache.asterix.messaging;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.app.external.ActiveLifecycleListener;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -47,10 +33,7 @@
public class CCMessageBroker implements ICCMessageBroker {
private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
- private final AtomicLong globalResourceId = new AtomicLong(0);
private final ClusterControllerService ccs;
- private final Set<String> nodesReportedMaxResourceId = new HashSet<>();
- public static final long NO_CALLBACK_MESSAGE_ID = -1;
public CCMessageBroker(ClusterControllerService ccs) {
this.ccs = ccs;
@@ -60,63 +43,9 @@
public void receivedMessage(IMessage message, String nodeId) throws Exception {
AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received message: " + absMessage.getMessageType().name());
+ LOGGER.info("Received message: " + absMessage.type());
}
- switch (absMessage.getMessageType()) {
- case RESOURCE_ID_REQUEST:
- handleResourceIdRequest(message, nodeId);
- break;
- case REPORT_MAX_RESOURCE_ID_RESPONSE:
- handleReportResourceMaxIdResponse(message, nodeId);
- break;
- case TAKEOVER_PARTITIONS_RESPONSE:
- handleTakeoverPartitionsResponse(message);
- break;
- case TAKEOVER_METADATA_NODE_RESPONSE:
- handleTakeoverMetadataNodeResponse(message);
- break;
- case PREPARE_PARTITIONS_FAILBACK_RESPONSE:
- handleClosePartitionsResponse(message);
- break;
- case COMPLETE_FAILBACK_RESPONSE:
- handleCompleteFailbcakResponse(message);
- break;
- case ACTIVE_ENTITY_TO_CC_MESSAGE:
- handleActiveEntityMessage(message);
- break;
- default:
- LOGGER.warning("Unknown message: " + absMessage.getMessageType());
- break;
- }
- }
-
- private void handleActiveEntityMessage(IMessage message) {
- ActiveLifecycleListener.INSTANCE.receive((ActivePartitionMessage) message);
- }
-
- private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
- ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message;
- ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
- reponse.setId(msg.getId());
- //cluster is not active
- if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
- reponse.setResourceId(-1);
- reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
- } else if (nodesReportedMaxResourceId.size() < AsterixClusterProperties.getNumberOfNodes()) {
- //some node has not reported max resource id
- reponse.setResourceId(-1);
- reponse.setException(new Exception("One or more nodes has not reported max resource id."));
- requestMaxResourceID();
- } else {
- reponse.setResourceId(globalResourceId.incrementAndGet());
- }
- sendApplicationMessageToNC(reponse, nodeId);
- }
-
- private synchronized void handleReportResourceMaxIdResponse(IMessage message, String nodeId) throws Exception {
- ReportMaxResourceIdMessage msg = (ReportMaxResourceIdMessage) message;
- globalResourceId.set(Math.max(msg.getMaxResourceId(), globalResourceId.get()));
- nodesReportedMaxResourceId.add(nodeId);
+ absMessage.handle(ccs);
}
@Override
@@ -125,36 +54,4 @@
NodeControllerState state = nodeMap.get(nodeId);
state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
}
-
- private void requestMaxResourceID() throws Exception {
- //send request to NCs that have not reported their max resource ids
- Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
- ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
- msg.setId(NO_CALLBACK_MESSAGE_ID);
- for (String nodeId : getParticipantNodes) {
- if (!nodesReportedMaxResourceId.contains(nodeId)) {
- sendApplicationMessageToNC(msg, nodeId);
- }
- }
- }
-
- private void handleTakeoverPartitionsResponse(IMessage message) {
- TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
- AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
- }
-
- private void handleTakeoverMetadataNodeResponse(IMessage message) {
- TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
- AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
- }
-
- private void handleCompleteFailbcakResponse(IMessage message) {
- CompleteFailbackResponseMessage msg = (CompleteFailbackResponseMessage) message;
- AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(msg);
- }
-
- private void handleClosePartitionsResponse(IMessage message) {
- PreparePartitionsFailbackResponseMessage msg = (PreparePartitionsFailbackResponseMessage) message;
- AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(msg);
- }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index c7e4ac8..9851b61 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -27,30 +27,13 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReplicaEventMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
@@ -112,49 +95,16 @@
@Override
public void receivedMessage(IMessage message, String nodeId) throws Exception {
- try {
- AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received message: " + absMessage.getMessageType().name());
- }
- //if the received message is a response to a sent message, deliver it to the sender
- IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
- if (callback != null) {
- callback.deliverMessageResponse(absMessage);
- }
-
- //handle requests from CC
- switch (absMessage.getMessageType()) {
- case REPORT_MAX_RESOURCE_ID_REQUEST:
- reportMaxResourceId();
- break;
- case TAKEOVER_PARTITIONS_REQUEST:
- handleTakeoverPartitons(message);
- break;
- case TAKEOVER_METADATA_NODE_REQUEST:
- handleTakeoverMetadataNode(message);
- break;
- case PREPARE_PARTITIONS_FAILBACK_REQUEST:
- handlePreparePartitionsFailback(message);
- break;
- case COMPLETE_FAILBACK_REQUEST:
- handleCompleteFailbackRequest(message);
- break;
- case REPLICA_EVENT:
- handleReplicaEvent(message);
- break;
- case ACTIVE_MANAGER_MESSAGE:
- ((ActiveManager) appContext.getActiveManager()).submit((ActiveManagerMessage) message);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, e.getMessage(), e);
- }
- throw e;
+ AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Received message: " + absMessage.type());
}
+ //if the received message is a response to a sent message, deliver it to the sender
+ IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
+ if (callback != null) {
+ callback.deliverMessageResponse(absMessage);
+ }
+ absMessage.handle(ncs);
}
public ConcurrentFramePool getMessagingFramePool() {
@@ -190,96 +140,6 @@
ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
}
- private void handleTakeoverPartitons(IMessage message) throws Exception {
- TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
- //if the NC is shutting down, it should ignore takeover partitions request
- if (!appContext.isShuttingdown()) {
- try {
- IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
- remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
- } finally {
- //send response after takeover is completed
- TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
- appContext.getTransactionSubsystem().getId(), msg.getPartitions());
- sendMessageToCC(reponse, null);
- }
- }
- }
-
- private void handleTakeoverMetadataNode(IMessage message) throws Exception {
- try {
- appContext.initializeMetadata(false);
- appContext.exportMetadataNodeStub();
- } finally {
- TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
- appContext.getTransactionSubsystem().getId());
- sendMessageToCC(reponse, null);
- }
- }
-
- public void reportMaxResourceId() throws Exception {
- ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
- //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
- long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
- MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
- maxResourceIdMsg.setMaxResourceId(maxResourceId);
- sendMessageToCC(maxResourceIdMsg, null);
- }
-
- private void handleReplicaEvent(IMessage message) {
- ReplicaEventMessage msg = (ReplicaEventMessage) message;
- Node node = new Node();
- node.setId(msg.getNodeId());
- node.setClusterIp(msg.getNodeIPAddress());
- Replica replica = new Replica(node);
- ReplicaEvent event = new ReplicaEvent(replica, msg.getEvent());
- appContext.getReplicationManager().reportReplicaEvent(event);
- }
-
- private void handlePreparePartitionsFailback(IMessage message) throws Exception {
- PreparePartitionsFailbackRequestMessage msg = (PreparePartitionsFailbackRequestMessage) message;
- /**
- * if the metadata partition will be failed back
- * we need to flush and close all datasets including metadata datasets
- * otherwise we need to close all non-metadata datasets and flush metadata datasets
- * so that their memory components will be copied to the failing back node
- */
- if (msg.isReleaseMetadataNode()) {
- appContext.getDatasetLifecycleManager().closeAllDatasets();
- //remove the metadata node stub from RMI registry
- appContext.unexportMetadataNodeStub();
- } else {
- //close all non-metadata datasets
- appContext.getDatasetLifecycleManager().closeUserDatasets();
- //flush the remaining metadata datasets that were not closed
- appContext.getDatasetLifecycleManager().flushAllDatasets();
- }
-
- //mark the partitions to be closed as inactive
- PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
- .getLocalResourceRepository();
- for (Integer partitionId : msg.getPartitions()) {
- localResourceRepo.addInactivePartition(partitionId);
- }
-
- //send response after partitions prepared for failback
- PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
- msg.getRequestId(), msg.getPartitions());
- sendMessageToCC(reponse, null);
- }
-
- private void handleCompleteFailbackRequest(IMessage message) throws Exception {
- CompleteFailbackRequestMessage msg = (CompleteFailbackRequestMessage) message;
- try {
- IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
- remoteRecoeryManager.completeFailbackProcess();
- } finally {
- CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
- msg.getRequestId(), msg.getPartitions());
- sendMessageToCC(reponse, null);
- }
- }
-
private class MessageDeliveryService implements Runnable {
/*
* TODO Currently this thread is not stopped when it is interrupted because
@@ -302,7 +162,7 @@
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING) && msg != null) {
LOGGER.log(Level.WARNING, "Could not process message with id: " + msg.getId() + " and type: "
- + msg.getMessageType().name(), e);
+ + msg.type(), e);
} else {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(Level.WARNING, "Could not process message", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
index 7536c70..5f99dbf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
@@ -23,9 +23,9 @@
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -44,8 +44,8 @@
public static void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
- throws Exception {
- AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+ throws Exception {
+ AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.INSTANCE.getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
JobSpecification spec = new JobSpecification(frameSize);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
index 015088a..1c190c9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
@@ -32,7 +32,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.asterix.common.config.AsterixBuildProperties;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.test.runtime.ExecutionTest;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.json.JSONObject;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 7e35f11..a548b3a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -36,6 +36,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -51,7 +52,6 @@
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import org.apache.asterix.transaction.management.service.logging.LogReader;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 3591509..4e8875b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -36,8 +36,8 @@
import org.apache.asterix.event.schema.cluster.MasterNode;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.RunStatement;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.asterix.translator.IStatementExecutor;
import org.junit.Assert;
import org.junit.Test;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index c73be7a..39a4d3b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -31,7 +31,7 @@
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.asterix.testframework.xml.TestSuite;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -84,7 +84,7 @@
List<ILibraryManager> libraryManagers = new ArrayList<>();
// Adds the library manager for CC.
- libraryManagers.add(AsterixAppContextInfo.getInstance().getLibraryManager());
+ libraryManagers.add(AsterixAppContextInfo.INSTANCE.getLibraryManager());
// Adds library managers for NCs, one-per-NC.
for (NodeControllerService nc : integrationUtil.ncs) {
IAsterixAppRuntimeContext runtimeCtx =
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index bfdc834..c01499e 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -263,6 +263,11 @@
<version>2.0.2-beta</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
similarity index 81%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index d986648..b9d187d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -16,31 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.util;
-
-import java.util.Arrays;
+package org.apache.asterix.common.exceptions;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class ExternalDataExceptionUtils {
+public class ExceptionUtils {
public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
public static final String MISSING_PARAMETER = "Missing parameter.\n";
public static final String PARAMETER_NAME = "Parameter name: ";
public static final String EXPECTED_VALUE = "Expected value: ";
public static final String PASSED_VALUE = "Passed value: ";
+ private ExceptionUtils() {
+ }
+
public static String incorrectParameterMessage(String parameterName, String expectedValue, String passedValue) {
- return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + ExternalDataConstants.LF + EXPECTED_VALUE
- + expectedValue + ExternalDataConstants.LF + PASSED_VALUE + passedValue;
- }
-
- public static String concat(String... vals) {
- return Arrays.toString(vals);
- }
-
- // For now, we are accepting all exceptions as resolvable by adapter.
- public static boolean isResolvable(Exception e) {
- return true;
+ return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE
+ + expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue;
}
public static HyracksDataException suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
index fbb9b86..5737957 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
@@ -22,7 +22,7 @@
public abstract class AbstractApplicationMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
- private long id;
+ protected long id;
@Override
public void setId(long id) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
deleted file mode 100644
index 6518ae5..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
-
- private static final long serialVersionUID = 1L;
- private final Set<Integer> partitions;
- private final String nodeId;
-
- public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
- super(planId, requestId);
- this.nodeId = nodeId;
- this.partitions = partitions;
- }
-
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.COMPLETE_FAILBACK_REQUEST;
- }
-
- public Set<Integer> getPartitions() {
- return partitions;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Plan ID: " + planId);
- sb.append(" Node ID: " + nodeId);
- sb.append(" Partitions: " + partitions);
- return sb.toString();
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
deleted file mode 100644
index 02fe917..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
-
- private static final long serialVersionUID = 1L;
- private final Set<Integer> partitions;
- private boolean releaseMetadataNode = false;
- private final String nodeID;
-
- public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
- super(planId, requestId);
- this.nodeID = nodeId;
- this.partitions = partitions;
- }
-
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_REQUEST;
- }
-
- public Set<Integer> getPartitions() {
- return partitions;
- }
-
- public boolean isReleaseMetadataNode() {
- return releaseMetadataNode;
- }
-
- public void setReleaseMetadataNode(boolean releaseMetadataNode) {
- this.releaseMetadataNode = releaseMetadataNode;
- }
-
- public String getNodeID() {
- return nodeID;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Plan ID: " + planId);
- sb.append(" Partitions: " + partitions);
- sb.append(" releaseMetadataNode: " + releaseMetadataNode);
- return sb.toString();
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
deleted file mode 100644
index bf0219c..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-
-public class ReplicaEventMessage extends AbstractApplicationMessage {
-
- private static final long serialVersionUID = 1L;
- private final String nodeId;
- private final ClusterEventType event;
- private final String nodeIPAddress;
-
- public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
- this.nodeId = nodeId;
- this.nodeIPAddress = nodeIPAddress;
- this.event = event;
- }
-
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.REPLICA_EVENT;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public ClusterEventType getEvent() {
- return event;
- }
-
- public String getNodeIPAddress() {
- return nodeIPAddress;
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
deleted file mode 100644
index a2b94a7..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
- private static final long serialVersionUID = 1L;
- public long maxResourceId;
-
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_RESPONSE;
- }
-
- public long getMaxResourceId() {
- return maxResourceId;
- }
-
- public void setMaxResourceId(long maxResourceId) {
- this.maxResourceId = maxResourceId;
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
deleted file mode 100644
index f7a276b..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
deleted file mode 100644
index 5d07d5d..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
-
- private static final long serialVersionUID = 1L;
- private final Integer[] partitions;
- private final long requestId;
- private final String nodeId;
-
- public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
- this.requestId = requestId;
- this.nodeId = nodeId;
- this.partitions = partitionsToTakeover;
- }
-
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
- }
-
- public Integer[] getPartitions() {
- return partitions;
- }
-
- public long getRequestId() {
- return requestId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Request ID: " + requestId);
- sb.append(" Node ID: " + nodeId);
- sb.append(" Partitions: ");
- for (Integer partitionId : partitions) {
- sb.append(partitionId + ",");
- }
- //remove last comma
- sb.charAt(sb.length() - 1);
- return sb.toString();
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5f08dd8..b93082d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -18,30 +18,12 @@
*/
package org.apache.asterix.common.messaging.api;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.service.IControllerService;
public interface IApplicationMessage extends IMessage {
- public enum ApplicationMessageType {
- RESOURCE_ID_REQUEST,
- RESOURCE_ID_RESPONSE,
- REPORT_MAX_RESOURCE_ID_REQUEST,
- REPORT_MAX_RESOURCE_ID_RESPONSE,
- TAKEOVER_PARTITIONS_REQUEST,
- TAKEOVER_PARTITIONS_RESPONSE,
- TAKEOVER_METADATA_NODE_REQUEST,
- TAKEOVER_METADATA_NODE_RESPONSE,
- PREPARE_PARTITIONS_FAILBACK_REQUEST,
- PREPARE_PARTITIONS_FAILBACK_RESPONSE,
- COMPLETE_FAILBACK_REQUEST,
- COMPLETE_FAILBACK_RESPONSE,
- REPLICA_EVENT,
- ACTIVE_ENTITY_TO_CC_MESSAGE,
- ACTIVE_MANAGER_MESSAGE
- }
-
- public abstract ApplicationMessageType getMessageType();
-
/**
* Sets a unique message id that identifies this message within an NC.
* This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
@@ -55,4 +37,16 @@
* @return The unique message id if it has been set, otherwise 0.
*/
public long getId();
+
+ /**
+ * handle the message upon delivery
+ */
+ public void handle(IControllerService cs) throws HyracksDataException;
+
+ /**
+ * get a string representation for the message type
+ *
+ * @return
+ */
+ public String type();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 7dafbd5..2290b93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -21,9 +21,11 @@
import org.apache.hyracks.api.messages.IMessageBroker;
public interface ICCMessageBroker extends IMessageBroker {
+ public static final long NO_CALLBACK_MESSAGE_ID = -1;
/**
* Sends the passed message to the specified {@code nodeId}
+ *
* @param msg
* @param nodeId
* @throws Exception
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
similarity index 91%
rename from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index 53a3cbd..74589cc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.bootstrap;
+package org.apache.asterix.common.metadata;
public class MetadataIndexImmutableProperties {
private final String indexName;
@@ -45,7 +45,7 @@
return datasetId;
}
- // Right now, we only have primary indexes. Hence, dataset name is always index name
+ // Right now, we only have primary Metadata indexes. Hence, dataset name is always index name
public String getDatasetName() {
return indexName;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
similarity index 72%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
index daeb9c4..06038cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
@@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.common.transactions;
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
- private static final long serialVersionUID = 1L;
+public interface IAsterixResourceIdManager {
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.RESOURCE_ID_REQUEST;
- }
+ long createResourceId();
+
+ boolean reported(String nodeId);
+
+ void report(String nodeId, long maxResourceId);
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index b49a719..e4f21a6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -23,8 +23,8 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
public interface IExternalDataSourceFactory extends Serializable {
@@ -73,11 +73,11 @@
AlgebricksAbsolutePartitionConstraint constraints, int count) {
if (constraints == null) {
ArrayList<String> locs = new ArrayList<String>();
- Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+ Map<String, String[]> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores();
int i = 0;
while (i < count) {
for (String node : stores.keySet()) {
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+ int numIODevices = AsterixClusterProperties.INSTANCE.getIODevices(node).length;
for (int k = 0; k < numIODevices; k++) {
locs.add(node);
i++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index cf4ed19..87ee167 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.dataflow;
-import javax.annotation.Nonnull;
-
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,7 +32,7 @@
protected final FeedLogManager feedLogManager;
public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
- @Nonnull FeedLogManager feedLogManager, int numOfFields) {
+ FeedLogManager feedLogManager, int numOfFields) {
this.feedLogManager = feedLogManager;
this.numOfFields = numOfFields;
this.ctx = ctx;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 10e9125..859a011 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -25,14 +25,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nonnull;
-
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.api.IFeedMarker;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -57,8 +55,8 @@
private Future<?> dataflowMarkerResult;
public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
- @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
- @Nonnull IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+ FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser,
+ IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
this.recordReader = recordReader;
@@ -101,13 +99,13 @@
try {
tupleForwarder.close();
} catch (Throwable th) {
- hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
}
try {
recordReader.close();
} catch (Throwable th) {
LOGGER.warn("Failure during while operating a feed sourcec", th);
- hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
} finally {
closeSignal();
if (sendMarker && dataflowMarkerResult != null) {
@@ -182,12 +180,12 @@
try {
tupleForwarder.close();
} catch (Throwable th) {
- hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
}
try {
recordReader.close();
} catch (Throwable th) {
- hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
}
if (hde != null) {
throw hde;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 36c6c2f..b8d0532 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -20,8 +20,6 @@
import java.io.IOException;
-import javax.annotation.Nonnull;
-
import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.external.util.FeedLogManager;
@@ -46,7 +44,7 @@
private boolean paused = false;
private boolean initialized;
- public FeedTupleForwarder(@Nonnull FeedLogManager feedLogManager) {
+ public FeedTupleForwarder(FeedLogManager feedLogManager) {
this.feedLogManager = feedLogManager;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index 9c8563d..2b06775 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -20,8 +20,6 @@
import java.io.IOException;
-import javax.annotation.Nonnull;
-
import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
@@ -34,7 +32,7 @@
private final IExternalIndexer indexer;
public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
- @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<? extends T> recordReader,
+ IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader,
IExternalIndexer indexer) throws IOException {
super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields());
this.indexer = indexer;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 915c343..e780834 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.dataflow;
-import javax.annotation.Nonnull;
-
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
@@ -38,7 +36,7 @@
protected final int numOfTupleFields;
public RecordDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
- @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<? extends T> recordReader,
+ IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader,
int numOfTupleFields) {
super(ctx, tupleForwarder);
this.dataParser = dataParser;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 7f2191c..0c49c92 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -43,7 +43,7 @@
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.FeedUtils.JobType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -202,7 +202,7 @@
}
}
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
List<String> intakeLocations = new ArrayList<>();
for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
@@ -349,7 +349,7 @@
private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, ActiveEvent message)
throws Exception {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(message.getJobId());
JobStatus status = info.getStatus();
EntityId feedId = intakeInfo.getFeedId();
@@ -369,7 +369,7 @@
private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
FeedConnectionId connectionId = cInfo.getConnectionId();
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(cInfo.getJobId());
JobStatus status = info.getStatus();
boolean failure = status != null && status.equals(JobStatus.FAILURE);
@@ -523,7 +523,7 @@
}
try {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(cInfo.getJobId());
List<String> collectLocations = new ArrayList<>();
for (OperatorDescriptorId collectOpId : collectOperatorIds) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 6c04b2d..2adce1c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -19,7 +19,6 @@
package org.apache.asterix.external.feed.runtime;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.log4j.Logger;
@@ -58,12 +57,7 @@
continueIngestion = false;
} catch (Exception e) {
LOGGER.error("Exception during feed ingestion ", e);
- // Check if the adapter wants to continue ingestion
- if (ExternalDataExceptionUtils.isResolvable(e)) {
- continueIngestion = adapter.handleException(e);
- } else {
- continueIngestion = false;
- }
+ continueIngestion = adapter.handleException(e);
failedIngestion = !continueIngestion;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 88964a1..006bf0b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -20,10 +20,9 @@
import java.io.IOException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class QuotedLineRecordReader extends LineRecordReader {
@@ -36,7 +35,7 @@
throws HyracksDataException {
super(hasHeader, stream);
if ((quoteString == null) || (quoteString.length() != 1)) {
- throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
+ throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(
ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
}
this.quote = quoteString.charAt(0);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 26ac3cb..4d6d004 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,10 +20,9 @@
import java.io.IOException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SemiStructuredRecordReader extends StreamRecordReader {
@@ -42,7 +41,7 @@
if (recStartString != null) {
if (recStartString.length() != 1) {
throw new HyracksDataException(
- ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
+ ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
}
recordStart = recStartString.charAt(0);
@@ -53,7 +52,7 @@
if (recEndString != null) {
if (recEndString.length() != 1) {
throw new HyracksDataException(
- ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
+ ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
}
recordEnd = recEndString.charAt(0);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 26f8654..7995091 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -23,8 +23,8 @@
import java.net.ServerSocket;
import java.net.Socket;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
@@ -116,14 +116,14 @@
}
socket = null;
} catch (IOException e) {
- hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
}
try {
if (server != null) {
server.close();
}
} catch (IOException e) {
- hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
} finally {
server = null;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index f63b895..aafee8e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -32,7 +32,7 @@
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.SocketServerInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.asterix.runtime.util.RuntimeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -64,8 +64,8 @@
"\'sockets\' parameter not specified as part of adapter configuration");
}
Map<InetAddress, Set<String>> ncMap;
- ncMap = AsterixRuntimeUtil.getNodeControllerMap();
- List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+ ncMap = RuntimeUtils.getNodeControllerMap();
+ List<String> ncs = RuntimeUtils.getAllNodeControllers();
String[] socketsArray = socketsValue.split(",");
Random random = new Random();
for (String socket : socketsArray) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 8052822..5575d2c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -26,7 +26,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index b0594d2..789e9ba 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -31,7 +31,7 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -70,7 +70,7 @@
ILibraryManager libraryManager;
if (context == null) {
// Gets the library manager for compile-time constant folding.
- libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+ libraryManager = AsterixAppContextInfo.INSTANCE.getLibraryManager();
} else {
// Gets the library manager for real runtime evaluation.
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) context.getJobletContext()
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index e98e744..ebe3276 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -20,8 +20,6 @@
import java.util.Map;
-import javax.annotation.Nonnull;
-
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IDataParserFactory;
@@ -57,7 +55,7 @@
}
@SuppressWarnings("rawtypes")
- public static IDataParserFactory getDataParserFactory(@Nonnull String parser) throws AsterixException {
+ public static IDataParserFactory getDataParserFactory(String parser) throws AsterixException {
switch (parser) {
case ExternalDataConstants.FORMAT_ADM:
case ExternalDataConstants.FORMAT_JSON:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 9c378a0..44980c8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -27,7 +27,7 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 936ada5..0848532 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -29,8 +29,8 @@
import org.apache.asterix.external.indexing.IndexingScheduler;
import org.apache.asterix.external.indexing.RecordId.RecordIdType;
import org.apache.asterix.external.input.stream.HDFSInputStream;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -49,7 +49,7 @@
public class HDFSUtils {
public static Scheduler initializeHDFSScheduler() {
- ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+ ICCContext ccContext = AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
Scheduler scheduler = null;
try {
scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
@@ -61,7 +61,7 @@
}
public static IndexingScheduler initializeIndexingHDFSScheduler() {
- ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+ ICCContext ccContext = AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
IndexingScheduler scheduler = null;
try {
scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
@@ -80,6 +80,7 @@
* 1. NoOp means appended file
* 2. AddOp means new file
* 3. UpdateOp means the delta of a file
+ *
* @return
* @throws IOException
*/
@@ -109,7 +110,7 @@
.add(new FileSplit(filePath,
block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
? block.getLength() : (file.getSize() - block.getOffset()),
- block.getHosts()));
+ block.getHosts()));
orderedExternalFiles.add(file);
}
}
@@ -201,17 +202,17 @@
public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
AlgebricksAbsolutePartitionConstraint clusterLocations) {
if (clusterLocations == null) {
- ArrayList<String> locs = new ArrayList<String>();
- Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
- for (String i : stores.keySet()) {
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
+ ArrayList<String> locs = new ArrayList<>();
+ Map<String, String[]> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores();
+ for (String node : stores.keySet()) {
+ int numIODevices = AsterixClusterProperties.INSTANCE.getIODevices(node).length;
for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
+ locs.add(node);
}
}
String[] cluster = new String[locs.size()];
cluster = locs.toArray(cluster);
- clusterLocations = new AlgebricksAbsolutePartitionConstraint(cluster);
+ return new AlgebricksAbsolutePartitionConstraint(cluster);
}
return clusterLocations;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
index 61764d7..52c2850 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
@@ -29,7 +29,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.asterix.runtime.util.RuntimeUtils;
/**
* Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller running at the location.
@@ -86,7 +86,7 @@
private static void updateNCs() throws Exception {
synchronized (ncMap) {
ncMap.clear();
- AsterixRuntimeUtil.getNodeControllerMap(ncMap);
+ RuntimeUtils.getNodeControllerMap(ncMap);
synchronized (ncs) {
ncs.clear();
for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet()) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index f5afef6..61f5021 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -22,7 +22,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 182cbda..c929b41 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -35,6 +35,7 @@
import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
@@ -52,7 +53,6 @@
import org.apache.asterix.metadata.api.IMetadataIndex;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.api.IValueExtractor;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.entities.CompactionPolicy;
import org.apache.asterix.metadata.entities.Dataset;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java
index d3f7e1c..d25f488 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java
@@ -20,8 +20,8 @@
import java.util.List;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.metadata.bootstrap.MetadataIndex;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 616c724..2629fea 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -67,8 +67,8 @@
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index 6c8bebd..db6145a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.List;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlBinaryHashFunctionFactoryProvider;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 8201627..833f3e5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -21,6 +21,7 @@
import java.util.Arrays;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.metadata.api.IMetadataIndex;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.BuiltinType;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index e7bf3bf..f076152 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -45,8 +45,8 @@
import org.apache.asterix.event.util.PatternCreator;
import org.apache.asterix.installer.schema.conf.Configuration;
import org.apache.asterix.metadata.api.IClusterManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
public class ClusterManager implements IClusterManager {
@@ -101,13 +101,13 @@
try {
Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
List<Pattern> pattern = new ArrayList<Pattern>();
- String asterixInstanceName = AsterixAppContextInfo.getInstance().getMetadataProperties().getInstanceName();
+ String asterixInstanceName = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getInstanceName();
Patterns prepareNode = PatternCreator.INSTANCE.createPrepareNodePattern(asterixInstanceName,
AsterixClusterProperties.INSTANCE.getCluster(), node);
cluster.getNode().add(node);
client.submit(prepareNode);
- AsterixExternalProperties externalProps = AsterixAppContextInfo.getInstance().getExternalProperties();
+ AsterixExternalProperties externalProps = AsterixAppContextInfo.INSTANCE.getExternalProperties();
AsterixEventServiceUtil.poulateClusterEnvironmentProperties(cluster, externalProps.getCCJavaParams(),
externalProps.getNCJavaParams());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
index 9e65d62..b5e6449 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
@@ -21,7 +21,7 @@
import java.util.HashSet;
import java.util.Set;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.hyracks.algebricks.common.utils.Pair;
/**
@@ -113,7 +113,7 @@
if (intValue < 0) {
return new Pair<Boolean, String>(false, "Value must be >= 0");
}
- int numNodesInCluster = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames()
+ int numNodesInCluster = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames()
.size();
if (numNodesInCluster < intValue) {
return new Pair<Boolean, String>(false,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 0975163..d8ac7b9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -85,14 +85,15 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -103,7 +104,6 @@
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -187,8 +187,8 @@
public AqlMetadataProvider(Dataverse defaultDataverse) {
this.defaultDataverse = defaultDataverse;
- this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
- this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+ this.storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
+ this.libraryManager = AsterixAppContextInfo.INSTANCE.getLibraryManager();
}
public String getPropertyValue(String propertyName) {
@@ -489,10 +489,11 @@
}
Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
- secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
- DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
- dataset.hasMetaPart(), primaryKeyIndicators, secondaryIndex.getKeyFieldSourceIndicators(),
- metaType);
+ secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
+ DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+ dataset.hasMetaPart(), primaryKeyIndicators,
+ secondaryIndex.getKeyFieldSourceIndicators(),
+ metaType);
comparatorFactories = comparatorFactoriesAndTypeTraits.first;
typeTraits = comparatorFactoriesAndTypeTraits.second;
if (filterTypeTraits != null) {
@@ -569,12 +570,12 @@
int[] buddyBreeFields = new int[] { numSecondaryKeys };
ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
new ExternalBTreeWithBuddyDataflowHelperFactory(
- compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
- getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+ getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -2245,4 +2246,4 @@
throw new AlgebricksException("Only record types can be indexed.");
}
}
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index d74bf9f..5853a39 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -32,8 +32,8 @@
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index 0b22dab..3dfa8a5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -36,7 +36,7 @@
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index a31f2a6..85e4405 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -28,7 +28,7 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index d247490..5a88a9f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -59,6 +59,7 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -69,7 +70,6 @@
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 5ba6ad2..53bd164 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -65,6 +65,7 @@
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -75,7 +76,6 @@
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 4da5fd4..46a2076 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -38,9 +38,9 @@
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
public class RemoteRecoveryManager implements IRemoteRecoveryManager {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 41fc0b8..01d6db7 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -40,10 +40,10 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
similarity index 91%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
index 4aba0c2..2eb1be9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -16,7 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
new file mode 100644
index 0000000..c96bcb8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(CompleteFailbackRequestMessage.class.getName());
+ private final Set<Integer> partitions;
+ private final String nodeId;
+
+ public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+ super(planId, requestId);
+ this.nodeId = nodeId;
+ this.partitions = partitions;
+ }
+
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Plan ID: " + planId);
+ sb.append(" Node ID: " + nodeId);
+ sb.append(" Partitions: " + partitions);
+ return sb.toString();
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAsterixAppRuntimeContext appContext =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ HyracksDataException hde = null;
+ try {
+ IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+ remoteRecoeryManager.completeFailbackProcess();
+ } catch (IOException | InterruptedException e) {
+ LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e);
+ hde = ExceptionUtils.convertToHyracksDataException(e);
+ } finally {
+ CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
+ requestId, partitions);
+ try {
+ broker.sendMessageToCC(reponse, null);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ }
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ @Override
+ public String type() {
+ return "COMPLETE_FAILBACK_REQUEST";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
similarity index 75%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index a036ff0..3c4b58c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -16,10 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
import java.util.Set;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage {
private static final long serialVersionUID = 1L;
@@ -30,11 +34,6 @@
this.partitions = partitions;
}
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.COMPLETE_FAILBACK_RESPONSE;
- }
-
public Set<Integer> getPartitions() {
return partitions;
}
@@ -46,4 +45,14 @@
sb.append(" Partitions: " + partitions);
return sb.toString();
}
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(this);
+ }
+
+ @Override
+ public String type() {
+ return "COMPLETE_FAILBACK_RESPONSE";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
similarity index 96%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
index 0591644..24f5fe8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
@@ -16,16 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.replication;
+package org.apache.asterix.runtime.message;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-
public class NodeFailbackPlan {
public enum FailbackPlanState {
@@ -107,7 +104,7 @@
}
}
- if (failedRequests.size() > 0) {
+ if (!failedRequests.isEmpty()) {
state = FailbackPlanState.FAILED;
for (Integer failedRequestId : failedRequests) {
markRequestCompleted(failedRequestId);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
new file mode 100644
index 0000000..e3b9fbe
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.rmi.RemoteException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName());
+ private final Set<Integer> partitions;
+ private boolean releaseMetadataNode = false;
+ private final String nodeID;
+
+ public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+ super(planId, requestId);
+ this.nodeID = nodeId;
+ this.partitions = partitions;
+ }
+
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+
+ public boolean isReleaseMetadataNode() {
+ return releaseMetadataNode;
+ }
+
+ public void setReleaseMetadataNode(boolean releaseMetadataNode) {
+ this.releaseMetadataNode = releaseMetadataNode;
+ }
+
+ public String getNodeID() {
+ return nodeID;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Plan ID: " + planId);
+ sb.append(" Partitions: " + partitions);
+ sb.append(" releaseMetadataNode: " + releaseMetadataNode);
+ return sb.toString();
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAsterixAppRuntimeContext appContext =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ /**
+ * if the metadata partition will be failed back
+ * we need to flush and close all datasets including metadata datasets
+ * otherwise we need to close all non-metadata datasets and flush metadata datasets
+ * so that their memory components will be copied to the failing back node
+ */
+ if (releaseMetadataNode) {
+ appContext.getDatasetLifecycleManager().closeAllDatasets();
+ //remove the metadata node stub from RMI registry
+ try {
+ appContext.unexportMetadataNodeStub();
+ } catch (RemoteException e) {
+ LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e);
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ } else {
+ //close all non-metadata datasets
+ appContext.getDatasetLifecycleManager().closeUserDatasets();
+ //flush the remaining metadata datasets that were not closed
+ appContext.getDatasetLifecycleManager().flushAllDatasets();
+ }
+
+ //mark the partitions to be closed as inactive
+ PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
+ .getLocalResourceRepository();
+ for (Integer partitionId : partitions) {
+ localResourceRepo.addInactivePartition(partitionId);
+ }
+
+ //send response after partitions prepared for failback
+ PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId,
+ requestId, partitions);
+ try {
+ broker.sendMessageToCC(reponse, null);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ @Override
+ public String type() {
+ return "PREPARE_PARTITIONS_FAILBACK_REQUEST";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
similarity index 72%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index 467c6cb..2e52773 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -16,10 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
import java.util.Set;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
private static final long serialVersionUID = 1L;
@@ -30,12 +34,17 @@
this.partitions = partitions;
}
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_RESPONSE;
- }
-
public Set<Integer> getPartitions() {
return partitions;
}
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(this);
+ }
+
+ @Override
+ public String type() {
+ return "PREPARE_PARTITIONS_FAILBACK_RESPONSE";
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
new file mode 100644
index 0000000..2aa4746
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReplicaEventMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final ClusterEventType event;
+ private final String nodeIPAddress;
+
+ public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
+ this.nodeId = nodeId;
+ this.nodeIPAddress = nodeIPAddress;
+ this.event = event;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public ClusterEventType getEvent() {
+ return event;
+ }
+
+ public String getNodeIPAddress() {
+ return nodeIPAddress;
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAsterixAppRuntimeContext appContext =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ Node node = new Node();
+ node.setId(nodeId);
+ node.setClusterIp(nodeIPAddress);
+ Replica replica = new Replica(node);
+ appContext.getReplicationManager().reportReplicaEvent(new ReplicaEvent(replica, event));
+ }
+
+ @Override
+ public String type() {
+ return "REPLICA_EVENT";
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
new file mode 100644
index 0000000..1604e29
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
+ private final long maxResourceId;
+ private final String src;
+
+ public ReportMaxResourceIdMessage(String src, long maxResourceId) {
+ this.src = src;
+ this.maxResourceId = maxResourceId;
+ }
+
+ public long getMaxResourceId() {
+ return maxResourceId;
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ IAsterixResourceIdManager resourceIdManager =
+ AsterixAppContextInfo.INSTANCE.getResourceIdManager();
+ resourceIdManager.report(src, maxResourceId);
+ }
+
+ public static void send(NodeControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = cs;
+ IAsterixAppRuntimeContext appContext =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+ ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
+ try {
+ ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg, null);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ @Override
+ public String type() {
+ return "REPORT_MAX_RESOURCE_ID_RESPONSE";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
similarity index 64%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index d2837ce..3e2becf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -16,22 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
private static final long serialVersionUID = 1L;
- public long maxResourceId;
@Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_REQUEST;
+ public void handle(IControllerService cs) throws HyracksDataException {
+ ReportMaxResourceIdMessage.send((NodeControllerService) cs);
}
- public long getMaxResourceId() {
- return maxResourceId;
- }
-
- public void setMaxResourceId(long maxResourceId) {
- this.maxResourceId = maxResourceId;
+ @Override
+ public String type() {
+ return "REPORT_MAX_RESOURCE_ID_REQUEST";
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
new file mode 100644
index 0000000..5e7d808
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ private final String src;
+
+ public ResourceIdRequestMessage(String src) {
+ this.src = src;
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ try {
+ ICCMessageBroker broker =
+ (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+ ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
+ reponse.setId(id);
+ if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
+ reponse.setResourceId(-1);
+ reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
+ } else {
+ IAsterixResourceIdManager resourceIdManager =
+ AsterixAppContextInfo.INSTANCE.getResourceIdManager();
+ reponse.setResourceId(resourceIdManager.createResourceId());
+ if (reponse.getResourceId() < 0) {
+ reponse.setException(new Exception("One or more nodes has not reported max resource id."));
+ }
+ requestMaxResourceID(resourceIdManager, broker);
+ }
+ broker.sendApplicationMessageToNC(reponse, src);
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ private void requestMaxResourceID(IAsterixResourceIdManager resourceIdManager, ICCMessageBroker broker)
+ throws Exception {
+ Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+ ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+ msg.setId(ICCMessageBroker.NO_CALLBACK_MESSAGE_ID);
+ for (String nodeId : getParticipantNodes) {
+ if (!resourceIdManager.reported(nodeId)) {
+ broker.sendApplicationMessageToNC(msg, nodeId);
+ }
+ }
+ }
+
+ @Override
+ public String type() {
+ return "RESOURCE_ID_REQUEST";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
similarity index 69%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 09c50d3..62c2163 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -16,7 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
private static final long serialVersionUID = 1L;
@@ -24,11 +28,6 @@
private long resourceId;
private Exception exception;
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.RESOURCE_ID_RESPONSE;
- }
-
public long getResourceId() {
return resourceId;
}
@@ -44,4 +43,15 @@
public void setException(Exception exception) {
this.exception = exception;
}
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ // Do nothing. for this message, the callback handles it, we probably should get rid of callbacks and
+ // instead, use the handle in the response to perform callback action
+ }
+
+ @Override
+ public String type() {
+ return "RESOURCE_ID_RESPONSE";
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..b18a879
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAsterixAppRuntimeContext appContext =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ HyracksDataException hde = null;
+ try {
+ appContext.initializeMetadata(false);
+ appContext.exportMetadataNodeStub();
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+ hde = new HyracksDataException(e);
+ } finally {
+ TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+ appContext.getTransactionSubsystem().getId());
+ try {
+ broker.sendMessageToCC(reponse, null);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ }
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ @Override
+ public String type() {
+ return "TAKEOVER_METADATA_NODE_REQUEST";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
similarity index 67%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index bc98a62..f7016bc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -16,7 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
@@ -27,12 +32,17 @@
this.nodeId = nodeId;
}
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
- }
-
public String getNodeId() {
return nodeId;
}
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(this);
+ }
+
+ @Override
+ public String type() {
+ return "TAKEOVER_METADATA_NODE_RESPONSE";
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..3b91084
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
+ private final Integer[] partitions;
+ private final long requestId;
+ private final String nodeId;
+
+ public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+ this.requestId = requestId;
+ this.nodeId = nodeId;
+ this.partitions = partitionsToTakeover;
+ }
+
+ public Integer[] getPartitions() {
+ return partitions;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Request ID: " + requestId);
+ sb.append(" Node ID: " + nodeId);
+ sb.append(" Partitions: ");
+ for (Integer partitionId : partitions) {
+ sb.append(partitionId + ",");
+ }
+ //remove last comma
+ sb.charAt(sb.length() - 1);
+ return sb.toString();
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAsterixAppRuntimeContext appContext =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ //if the NC is shutting down, it should ignore takeover partitions request
+ if (!appContext.isShuttingdown()) {
+ HyracksDataException hde = null;
+ try {
+ IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+ remoteRecoeryManager.takeoverPartitons(partitions);
+ } catch (IOException | ACIDException e) {
+ LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ } finally {
+ //send response after takeover is completed
+ TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
+ appContext.getTransactionSubsystem().getId(), partitions);
+ try {
+ broker.sendMessageToCC(reponse, null);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
+ hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ }
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+ }
+
+ @Override
+ public String type() {
+ return "TAKEOVER_PARTITIONS_REQUEST";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
similarity index 72%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index 54597d9..9ec71b7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -16,7 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
@@ -31,11 +36,6 @@
this.partitions = partitionsToTakeover;
}
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
- }
-
public Integer[] getPartitions() {
return partitions;
}
@@ -47,4 +47,14 @@
public long getRequestId() {
return requestId;
}
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException {
+ AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(this);
+ }
+
+ @Override
+ public String type() {
+ return "TAKEOVER_PARTITIONS_RESPONSE";
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
similarity index 82%
rename from asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index ab1ebe1..4cb65c2 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -16,30 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.transaction.management.resource;
+package org.apache.asterix.runtime.transaction;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
+import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
import org.apache.hyracks.api.application.IApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
/**
- * A resource id factory that generates unique resource ids across all NCs by requesting unique ids from the cluster controller.
+ * A resource id factory that generates unique resource ids across all NCs by requesting
+ * unique ids from the cluster controller.
*/
public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
private final IApplicationContext appCtx;
private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+ private final String nodeId;
public GlobalResourceIdFactory(IApplicationContext appCtx) {
this.appCtx = appCtx;
this.resourceIdResponseQ = new LinkedBlockingQueue<>();
+ this.nodeId = ((NodeControllerService) appCtx.getControllerService()).getApplicationContext().getNodeId();
}
@Override
@@ -47,16 +51,16 @@
try {
ResourceIdRequestResponseMessage reponse = null;
//if there already exists a response, use it
- if (resourceIdResponseQ.size() > 0) {
+ if (!resourceIdResponseQ.isEmpty()) {
synchronized (resourceIdResponseQ) {
- if (resourceIdResponseQ.size() > 0) {
+ if (!resourceIdResponseQ.isEmpty()) {
reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
}
}
}
//if no response available or it has an exception, request a new one
if (reponse == null || reponse.getException() != null) {
- ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
+ ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
if (reponse.getException() != null) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
similarity index 94%
rename from asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
index 7bcf379..9027f75 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.transaction.management.resource;
+package org.apache.asterix.runtime.transaction;
import org.apache.hyracks.api.application.IApplicationContext;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
similarity index 84%
rename from asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
index e7b15a2..4f2cebd 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.om.util;
+package org.apache.asterix.runtime.util;
import java.io.IOException;
import java.util.logging.Logger;
@@ -37,7 +37,7 @@
import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -51,9 +51,11 @@
*/
public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IAsterixPropertiesProvider {
- private static AsterixAppContextInfo INSTANCE;
- private final ICCApplicationContext appCtx;
-
+ public static final AsterixAppContextInfo INSTANCE = new AsterixAppContextInfo();
+ private ICCApplicationContext appCtx;
+ private IGlobalRecoveryMaanger globalRecoveryMaanger;
+ private ILibraryManager libraryManager;
+ private IAsterixResourceIdManager resourceIdManager;
private AsterixCompilerProperties compilerProperties;
private AsterixExternalProperties externalProperties;
private AsterixMetadataProperties metadataProperties;
@@ -64,19 +66,26 @@
private AsterixReplicationProperties replicationProperties;
private AsterixExtensionProperties extensionProperties;
private MessagingProperties messagingProperties;
- private final IGlobalRecoveryMaanger globalRecoveryMaanger;
private IHyracksClientConnection hcc;
- private final ILibraryManager libraryManager;
private Object extensionManager;
+ private volatile boolean initialized = false;
- public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
- IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager)
+ private AsterixAppContextInfo() {
+ }
+
+ public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+ IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager,
+ IAsterixResourceIdManager resourceIdManager)
throws AsterixException, IOException {
- if (INSTANCE != null) {
- return;
+ if (INSTANCE.initialized) {
+ throw new AsterixException(AsterixAppContextInfo.class.getSimpleName() + " has been initialized already");
}
- INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger, libraryManager);
-
+ INSTANCE.initialized = true;
+ INSTANCE.appCtx = ccAppCtx;
+ INSTANCE.hcc = hcc;
+ INSTANCE.globalRecoveryMaanger = globalRecoveryMaanger;
+ INSTANCE.libraryManager = libraryManager;
+ INSTANCE.resourceIdManager = resourceIdManager;
// Determine whether to use old-style asterix-configuration.xml or new-style configuration.
// QQQ strip this out eventually
AsterixPropertiesAccessor propertiesAccessor;
@@ -102,16 +111,8 @@
Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
}
- private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
- IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager) {
- this.appCtx = ccAppCtx;
- this.hcc = hcc;
- this.globalRecoveryMaanger = globalRecoveryMaanger;
- this.libraryManager = libraryManager;
- }
-
- public static AsterixAppContextInfo getInstance() {
- return INSTANCE;
+ public boolean initialized() {
+ return initialized;
}
@Override
@@ -199,4 +200,8 @@
public MessagingProperties getMessagingProperties() {
return messagingProperties;
}
+
+ public IAsterixResourceIdManager getResourceIdManager() {
+ return resourceIdManager;
+ }
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
similarity index 81%
rename from asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
index 5acf9ae..2457ddc 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.om.util;
+package org.apache.asterix.runtime.util;
import java.io.InputStream;
import java.util.ArrayList;
@@ -26,6 +26,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.logging.Level;
@@ -38,20 +39,20 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReplicaEventMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.replication.NodeFailbackPlan;
-import org.apache.asterix.common.replication.NodeFailbackPlan.FailbackPlanState;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
import org.json.JSONException;
@@ -62,10 +63,10 @@
*/
public class AsterixClusterProperties {
- /**
- * TODO: currently after instance restarts we require all nodes to join again, otherwise the cluster wont be ACTIVE.
- * we may overcome this by storing the cluster state before the instance shutdown and using it on startup to identify
- * the nodes that are expected the join.
+ /*
+ * TODO: currently after instance restarts we require all nodes to join again,
+ * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance
+ * shutdown and using it on startup to identify the nodes that are expected the join.
*/
private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
@@ -75,7 +76,7 @@
private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
private static final String IO_DEVICES = "iodevices";
private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
- private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<String, Map<String, String>>();
+ private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>();
private final Cluster cluster;
private ClusterState state = ClusterState.UNUSABLE;
@@ -105,24 +106,23 @@
Unmarshaller unmarshaller = ctx.createUnmarshaller();
cluster = (Cluster) unmarshaller.unmarshal(is);
} catch (JAXBException e) {
- throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
+ throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE, e);
}
} else {
cluster = null;
}
// if this is the CC process
- if (AsterixAppContextInfo.getInstance() != null) {
- if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
- node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
- clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
- currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
- replicationEnabled = isReplicationEnabled();
- autoFailover = isAutoFailoverEnabled();
- if (autoFailover) {
- pendingTakeoverRequests = new HashMap<>();
- pendingProcessingFailbackPlans = new LinkedList<>();
- planId2FailbackPlanMap = new HashMap<>();
- }
+ if (AsterixAppContextInfo.INSTANCE.initialized()
+ && AsterixAppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+ node2PartitionsMap = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
+ clusterPartitions = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
+ currentMetadataNode = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+ replicationEnabled = isReplicationEnabled();
+ autoFailover = isAutoFailoverEnabled();
+ if (autoFailover) {
+ pendingTakeoverRequests = new HashMap<>();
+ pendingProcessingFailbackPlans = new LinkedList<>();
+ planId2FailbackPlanMap = new HashMap<>();
}
}
}
@@ -214,12 +214,9 @@
state = ClusterState.ACTIVE;
LOGGER.info("Cluster is now ACTIVE");
//start global recovery
- AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
- if (autoFailover) {
- //if there are any pending failback requests, process them
- if (pendingProcessingFailbackPlans.size() > 0) {
- processPendingFailbackPlans();
- }
+ AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
+ if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
+ processPendingFailbackPlans();
}
} else {
requestMetadataNodeTakeover();
@@ -227,26 +224,11 @@
}
/**
- * Returns the number of IO devices configured for a Node Controller
- *
- * @param nodeId
- * unique identifier of the Node Controller
- * @return number of IO devices. -1 if the node id is not valid. A node id
- * is not valid if it does not correspond to the set of registered
- * Node Controllers.
- */
- public int getNumberOfIODevices(String nodeId) {
- String[] ioDevs = getIODevices(nodeId);
- return ioDevs == null ? -1 : ioDevs.length;
- }
-
- /**
* Returns the IO devices configured for a Node Controller
*
* @param nodeId
* unique identifier of the Node Controller
- * @return a list of IO devices. null if node id is not valid. A node id is not valid
- * if it does not correspond to the set of registered Node Controllers.
+ * @return a list of IO devices.
*/
public synchronized String[] getIODevices(String nodeId) {
Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
@@ -255,7 +237,7 @@
LOGGER.warning("Configuration parameters for nodeId " + nodeId
+ " not found. The node has not joined yet or has left.");
}
- return null;
+ return new String[0];
}
return ncConfig.get(IO_DEVICES).split(",");
}
@@ -274,7 +256,7 @@
}
public synchronized Set<String> getParticipantNodes() {
- Set<String> participantNodes = new HashSet<String>();
+ Set<String> participantNodes = new HashSet<>();
for (String pNode : activeNcConfiguration.keySet()) {
participantNodes.add(pNode);
}
@@ -316,7 +298,7 @@
}
public static int getNumberOfNodes() {
- return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
+ return AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
}
public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
@@ -349,12 +331,12 @@
private synchronized void requestPartitionsTakeover(String failedNodeId) {
//replica -> list of partitions to takeover
Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
- AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+ AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
.getReplicationProperties();
//collect the partitions of the failed NC
List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
- if (lostPartitions.size() > 0) {
+ if (!lostPartitions.isEmpty()) {
for (ClusterPartition partition : lostPartitions) {
//find replicas for this partitions
Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
@@ -362,15 +344,8 @@
for (String replica : partitionReplicas) {
//TODO (mhubail) currently this assigns the partition to the first found active replica.
//It needs to be modified to consider load balancing.
- if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
- if (!partitionRecoveryPlan.containsKey(replica)) {
- List<Integer> replicaPartitions = new ArrayList<>();
- replicaPartitions.add(partition.getPartitionId());
- partitionRecoveryPlan.put(replica, replicaPartitions);
- } else {
- partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
- }
- }
+ addActiveReplica(replica, partition, partitionRecoveryPlan);
+ // bug? will always break on first loop execution
break;
}
}
@@ -382,11 +357,12 @@
} else {
LOGGER.info("Partitions to recover: " + lostPartitions);
}
- ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
.getCCApplicationContext().getMessageBroker();
//For each replica, send a request to takeover the assigned partitions
- for (String replica : partitionRecoveryPlan.keySet()) {
- Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
+ for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) {
+ String replica = entry.getKey();
+ Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]);
long requestId = clusterRequestId++;
TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
replica, partitionsToTakeover);
@@ -399,13 +375,25 @@
* has failed. When the failure notification arrives, we will send any pending request
* that belongs to the failed NC to a different active replica.
*/
- LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
- e.printStackTrace();
+ LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e);
}
}
}
}
+ private void addActiveReplica(String replica, ClusterPartition partition,
+ Map<String, List<Integer>> partitionRecoveryPlan) {
+ if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+ if (!partitionRecoveryPlan.containsKey(replica)) {
+ List<Integer> replicaPartitions = new ArrayList<>();
+ replicaPartitions.add(partition.getPartitionId());
+ partitionRecoveryPlan.put(replica, replicaPartitions);
+ } else {
+ partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+ }
+ }
+ }
+
private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
List<ClusterPartition> nodePartitions = new ArrayList<>();
for (ClusterPartition partition : clusterPartitions.values()) {
@@ -436,11 +424,11 @@
private synchronized void requestMetadataNodeTakeover() {
//need a new node to takeover metadata node
- ClusterPartition metadataPartiton = AsterixAppContextInfo.getInstance().getMetadataProperties()
+ ClusterPartition metadataPartiton = AsterixAppContextInfo.INSTANCE.getMetadataProperties()
.getMetadataPartition();
//request the metadataPartition node to register itself as the metadata node
TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
- ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
.getCCApplicationContext().getMessageBroker();
try {
messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
@@ -450,8 +438,8 @@
* has failed. When the failure notification arrives, a new NC will be assigned to
* the metadata partition and a new metadata node takeover request will be sent to it.
*/
- LOGGER.warning("Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId());
- e.printStackTrace();
+ LOGGER.log(Level.WARNING,
+ "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e);
}
}
@@ -479,7 +467,7 @@
planId2FailbackPlanMap.put(plan.getPlanId(), plan);
//get all partitions this node requires to resync
- AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+ AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
.getReplicationProperties();
Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
for (String replicaId : nodeReplicas) {
@@ -531,7 +519,7 @@
* if the returning node is the original metadata node,
* then metadata node will change after the failback completes
*/
- String originalMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties()
+ String originalMetadataNode = AsterixAppContextInfo.INSTANCE.getMetadataProperties()
.getMetadataNodeName();
if (originalMetadataNode.equals(failbackNode)) {
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
@@ -541,23 +529,9 @@
//force new jobs to wait
state = ClusterState.REBALANCING;
-
- ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
.getCCApplicationContext().getMessageBroker();
- //send requests to other nodes to complete on-going jobs and prepare partitions for failback
- Set<PreparePartitionsFailbackRequestMessage> planFailbackRequests = plan.getPlanFailbackRequests();
- for (PreparePartitionsFailbackRequestMessage request : planFailbackRequests) {
- try {
- messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
- plan.addPendingRequest(request);
- } catch (Exception e) {
- LOGGER.warning("Failed to send failback request to: " + request.getNodeID());
- e.printStackTrace();
- plan.notifyNodeFailure(request.getNodeID());
- revertFailedFailbackPlanEffects();
- break;
- }
- }
+ handleFailbackRequests(plan, messageBroker);
/**
* wait until the current plan is completed before processing the next plan.
* when the current one completes or is reverted, the cluster state will be
@@ -572,6 +546,21 @@
}
}
+ private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) {
+ //send requests to other nodes to complete on-going jobs and prepare partitions for failback
+ for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) {
+ try {
+ messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
+ plan.addPendingRequest(request);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e);
+ plan.notifyNodeFailure(request.getNodeID());
+ revertFailedFailbackPlanEffects();
+ break;
+ }
+ }
+ }
+
public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
plan.markRequestCompleted(msg.getRequestId());
@@ -585,13 +574,12 @@
CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
//send complete resync and takeover partitions to the failing back node
- ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
.getCCApplicationContext().getMessageBroker();
try {
messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
} catch (Exception e) {
- LOGGER.warning("Failed to send complete failback request to: " + request.getNodeId());
- e.printStackTrace();
+ LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e);
notifyFailbackPlansNodeFailure(request.getNodeId());
revertFailedFailbackPlanEffects();
}
@@ -617,7 +605,7 @@
}
private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
- AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+ AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
.getReplicationProperties();
Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
String nodeIdAddress = "";
@@ -627,7 +615,7 @@
}
ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
- ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
.getCCApplicationContext().getMessageBroker();
for (String replica : remoteReplicas) {
//if the remote replica is alive, send the event
@@ -635,7 +623,7 @@
try {
messageBroker.sendApplicationMessageToNC(msg, replica);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
}
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java
new file mode 100644
index 0000000..abe94bf
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.file.IFileMapProvider;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
+ ILSMIOOperationSchedulerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
+
+ private AsterixRuntimeComponentsProvider() {
+ }
+
+ @Override
+ public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
+ return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getLSMIOScheduler();
+ }
+
+ @Override
+ public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
+ return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getBufferCache();
+ }
+
+ @Override
+ public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
+ return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getFileMapManager();
+ }
+
+ @Override
+ public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
+ return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getLocalResourceRepository();
+ }
+
+ @Override
+ public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+ return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getDatasetLifecycleManager();
+ }
+
+ @Override
+ public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+ return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getResourceIdFactory();
+ }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
similarity index 74%
rename from asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
index 0e9aa0c..2517df5 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.om.util;
+package org.apache.asterix.runtime.util;
import java.net.InetAddress;
import java.util.ArrayList;
@@ -26,37 +26,40 @@
import java.util.Map;
import java.util.Set;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.cc.ClusterControllerService;
/**
* Utility class for obtaining information on the set of Hyracks NodeController
* processes that are running on a given host.
*/
-public class AsterixRuntimeUtil {
+public class RuntimeUtils {
- public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws Exception {
- Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
- Set<String> nodeControllersAtLocation = nodeControllerInfo.get(ipAddress);
- return nodeControllersAtLocation;
+ private RuntimeUtils() {
}
- public static List<String> getAllNodeControllers() throws Exception {
+ public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws HyracksDataException {
+ Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
+ return nodeControllerInfo.get(ipAddress);
+ }
+
+ public static List<String> getAllNodeControllers() throws HyracksDataException {
Collection<Set<String>> nodeControllersCollection = getNodeControllerMap().values();
- List<String> nodeControllers = new ArrayList<String>();
+ List<String> nodeControllers = new ArrayList<>();
for (Set<String> ncCollection : nodeControllersCollection) {
nodeControllers.addAll(ncCollection);
}
return nodeControllers;
}
- public static Map<InetAddress, Set<String>> getNodeControllerMap() throws Exception {
- Map<InetAddress, Set<String>> map = new HashMap<InetAddress, Set<String>>();
- AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+ public static Map<InetAddress, Set<String>> getNodeControllerMap() throws HyracksDataException {
+ Map<InetAddress, Set<String>> map = new HashMap<>();
+ AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
return map;
}
- public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception {
- ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance()
+ public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) {
+ ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.INSTANCE
.getCCApplicationContext().getControllerService();
map.putAll(ccs.getIpAddressNodeNameMap());
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 420e479..006eac7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -59,22 +60,27 @@
public class PersistentLocalResourceRepository implements ILocalResourceRepository {
+ // Public constants
+ public static final String METADATA_FILE_NAME = ".metadata";
+ // Private constants
private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
- private final String[] mountPoints;
private static final String STORAGE_METADATA_DIRECTORY = "asterix_root_metadata";
private static final String STORAGE_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
private static final long STORAGE_LOCAL_RESOURCE_ID = -4321;
- public static final String METADATA_FILE_NAME = ".metadata";
- public static final int STORAGE_VERSION = LIFOMetaDataFrame.VERSION;
- private final Cache<String, LocalResource> resourceCache;
- private final String nodeId;
private static final int MAX_CACHED_RESOURCES = 1000;
- private IReplicationManager replicationManager;
- private boolean isReplicationEnabled = false;
- private Set<String> filesToBeReplicated;
+ private static final FilenameFilter METADATA_FILES_FILTER =
+ (File dir, String name) -> name.equalsIgnoreCase(METADATA_FILE_NAME);
+ // Finals
+ private final String[] mountPoints;
+ private final String nodeId;
+ private final Cache<String, LocalResource> resourceCache;
private final SortedMap<Integer, ClusterPartition> clusterPartitions;
private final Set<Integer> nodeOriginalPartitions;
private final Set<Integer> nodeActivePartitions;
+ // Mutables
+ private boolean isReplicationEnabled = false;
+ private Set<String> filesToBeReplicated;
+ private IReplicationManager replicationManager;
private Set<Integer> nodeInactivePartitions;
public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
@@ -88,8 +94,8 @@
if (!mountPointDir.exists()) {
throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
}
- if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
- mountPoints[i] = mountPoint + System.getProperty("file.separator");
+ if (!mountPoint.endsWith(File.separator)) {
+ mountPoints[i] = mountPoint + File.separator;
} else {
mountPoints[i] = mountPoint;
}
@@ -120,9 +126,13 @@
LOGGER.info("Initializing local resource repository ... ");
}
- //create storage metadata file (This file is used to locate the root storage directory after instance restarts).
- //TODO with the existing cluster configuration file being static and distributed on all NCs, we can find out the storage root
- //directory without looking at this file. This file could potentially store more information, otherwise no need to keep it.
+ /*
+ * create storage metadata file
+ * (This file is used to locate the root storage directory after instance restarts).
+ * TODO with the existing cluster configuration file being static and distributed on all NCs
+ * we can find out the storage root directory without looking at this file.
+ * This file could potentially store more information, otherwise no need to keep it.
+ */
for (int i = 0; i < mountPoints.length; i++) {
File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
File storageMetadataDir = storageMetadataFile.getParentFile();
@@ -138,9 +148,9 @@
"created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
String storageRootDirPath;
- if (storageRootDirName.startsWith(System.getProperty("file.separator"))) {
+ if (storageRootDirName.startsWith(File.separator)) {
storageRootDirPath = mountPoints[i]
- + storageRootDirName.substring(System.getProperty("file.separator").length());
+ + storageRootDirName.substring(File.separator.length());
} else {
storageRootDirPath = mountPoints[i] + storageRootDirName;
}
@@ -215,47 +225,52 @@
return new File(resourcePath + File.separator + METADATA_FILE_NAME);
}
- public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
+ public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
//TODO During recovery, the memory usage currently is proportional to the number of resources available.
//This could be fixed by traversing all resources on disk until the required resource is found.
- HashMap<Long, LocalResource> resourcesMap = new HashMap<Long, LocalResource>();
-
+ Map<Long, LocalResource> resourcesMap = new HashMap<>();
for (int i = 0; i < mountPoints.length; i++) {
File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
if (storageRootDir == null) {
continue;
}
-
//load all local resources.
File[] partitions = storageRootDir.listFiles();
for (File partition : partitions) {
File[] dataverseFileList = partition.listFiles();
if (dataverseFileList != null) {
for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- resourcesMap.put(localResource.getResourceId(), localResource);
- }
- }
- }
- }
- }
- }
+ loadDataverse(dataverseFile, resourcesMap);
}
}
}
}
-
return resourcesMap;
}
+ private void loadDataverse(File dataverseFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ loadIndex(indexFile, resourcesMap);
+ }
+ }
+ }
+ }
+
+ private void loadIndex(File indexFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ resourcesMap.put(localResource.getResourceId(), localResource);
+ }
+ }
+ }
+ }
+
@Override
public long getMaxResourceID() throws HyracksDataException {
long maxResourceId = 0;
@@ -273,46 +288,52 @@
File[] dataverseFileList = partition.listFiles();
if (dataverseFileList != null) {
for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
- }
- }
- }
- }
- }
- }
+ maxResourceId = getMaxResourceIdForDataverse(dataverseFile, maxResourceId);
}
}
}
}
+ return maxResourceId;
+ }
+ private long getMaxResourceIdForDataverse(File dataverseFile, long maxSoFar) throws HyracksDataException {
+ long maxResourceId = maxSoFar;
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ maxResourceId = getMaxResourceIdForIndex(indexFile, maxResourceId);
+ }
+ }
+ }
+ return maxResourceId;
+ }
+
+ private long getMaxResourceIdForIndex(File indexFile, long maxSoFar) throws HyracksDataException {
+ long maxResourceId = maxSoFar;
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
+ }
+ }
+ }
return maxResourceId;
}
private static String getFileName(String baseDir, long resourceId) {
- if (resourceId == STORAGE_LOCAL_RESOURCE_ID) {
- return baseDir;
- } else {
- if (!baseDir.endsWith(System.getProperty("file.separator"))) {
- baseDir += System.getProperty("file.separator");
- }
- return baseDir + METADATA_FILE_NAME;
- }
+ return (resourceId == STORAGE_LOCAL_RESOURCE_ID) ? baseDir
+ : baseDir.endsWith(File.separator) ? (baseDir + METADATA_FILE_NAME)
+ : (baseDir + File.separator + METADATA_FILE_NAME);
}
public static LocalResource readLocalResource(File file) throws HyracksDataException {
try (FileInputStream fis = new FileInputStream(file);
ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
LocalResource resource = (LocalResource) oisFromFis.readObject();
- if (resource.getVersion() == PersistentLocalResourceRepository.STORAGE_VERSION) {
+ if (resource.getVersion() == LIFOMetaDataFrame.VERSION) {
return resource;
} else {
throw new AsterixException("Storage version mismatch.");
@@ -322,23 +343,12 @@
}
}
- private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
- return true;
- } else {
- return false;
- }
- }
- };
-
public void setReplicationManager(IReplicationManager replicationManager) {
this.replicationManager = replicationManager;
isReplicationEnabled = replicationManager.isReplicationEnabled();
if (isReplicationEnabled) {
- filesToBeReplicated = new HashSet<String>();
+ filesToBeReplicated = new HashSet<>();
nodeInactivePartitions = ConcurrentHashMap.newKeySet();
}
}
@@ -379,12 +389,9 @@
public void deleteStorageData(boolean deleteStorageMetadata) throws IOException {
for (int i = 0; i < mountPoints.length; i++) {
File storageDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
- if (storageDir != null) {
- if (storageDir.isDirectory()) {
- FileUtils.deleteDirectory(storageDir);
- }
+ if (storageDir != null && storageDir.isDirectory()) {
+ FileUtils.deleteDirectory(storageDir);
}
-
if (deleteStorageMetadata) {
//delete the metadata root directory
File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
@@ -405,8 +412,7 @@
private static File getStorageMetadataFile(String mountPoint, String nodeId, int ioDeviceId) {
String storageMetadataFileName = getStorageMetadataDirPath(mountPoint, nodeId, ioDeviceId) + File.separator
+ STORAGE_METADATA_FILE_NAME_PREFIX;
- File storageMetadataFile = new File(storageMetadataFileName);
- return storageMetadataFile;
+ return new File(storageMetadataFileName);
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
index 35aeef5..f9618cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
@@ -23,12 +23,13 @@
import java.util.Set;
import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.topology.ClusterTopology;
public interface ICCContext {
public ClusterControllerInfo getClusterControllerInfo();
- public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception;
+ public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException;
public ClusterTopology getClusterTopology();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index 1ce4f23..b1fa494 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -36,7 +36,6 @@
}
private JobId() {
-
}
public JobId(long id) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
similarity index 72%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index daeb9c4..bd69f9e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.messaging;
+package org.apache.hyracks.api.job;
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
- private static final long serialVersionUID = 1L;
+public class JobIdFactory {
+ private long id = 0;
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.RESOURCE_ID_REQUEST;
+ public JobId create() {
+ return new JobId(id++);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 58edf60..ae097a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -44,7 +44,9 @@
import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.service.IControllerService;
@@ -148,7 +150,7 @@
private final IDatasetDirectoryService datasetDirectoryService;
- private long jobCounter;
+ private final JobIdFactory jobIdFactory;
private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
@@ -162,8 +164,8 @@
this.ccConfig = ccConfig;
File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
jobLog = new LogFile(jobLogFolder);
- nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
- ipAddressNodeNameMap = new HashMap<InetAddress, Set<String>>();
+ nodeRegistry = new LinkedHashMap<>();
+ ipAddressNodeNameMap = new HashMap<>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
IIPCI ccIPCI = new ClusterControllerIPCI();
clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
@@ -172,7 +174,7 @@
clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
- activeRunMap = new HashMap<JobId, JobRun>();
+ activeRunMap = new HashMap<>();
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
private static final long serialVersionUID = 1L;
@@ -195,28 +197,12 @@
workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
this.timer = new Timer(true);
final ClusterTopology topology = computeClusterTopology(ccConfig);
- ccContext = new ICCContext() {
- @Override
- public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception {
- GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
- workQueue.scheduleAndSync(ginmw);
- }
-
- @Override
- public ClusterControllerInfo getClusterControllerInfo() {
- return info;
- }
-
- @Override
- public ClusterTopology getClusterTopology() {
- return topology;
- }
- };
+ ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
- jobCounter = 0;
+ jobIdFactory = new JobIdFactory();
- deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
+ deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
}
@@ -361,10 +347,6 @@
return appCtx;
}
- private JobId createJobId() {
- return new JobId(jobCounter++);
- }
-
public ClusterControllerInfo getClusterControllerInfo() {
return info;
}
@@ -381,6 +363,34 @@
return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
}
+ private final class ClusterControllerContext implements ICCContext {
+ private final ClusterTopology topology;
+
+ private ClusterControllerContext(ClusterTopology topology) {
+ this.topology = topology;
+ }
+
+ @Override
+ public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException {
+ GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
+ try {
+ workQueue.scheduleAndSync(ginmw);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public ClusterControllerInfo getClusterControllerInfo() {
+ return info;
+ }
+
+ @Override
+ public ClusterTopology getClusterTopology() {
+ return topology;
+ }
+ }
+
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
@@ -393,6 +403,7 @@
}
private class HyracksClientInterfaceIPCI implements IIPCI {
+
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
Exception exception) {
@@ -426,7 +437,7 @@
case START_JOB: {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
- JobId jobId = createJobId();
+ JobId jobId = jobIdFactory.create();
workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
return;
@@ -693,5 +704,4 @@
public synchronized ShutdownRun getShutdownRun() {
return shutdownCallback;
}
-
}