[NO ISSUE] More multi-CC support, ConfigManager updates
- add ability for OptionTypes to natively parse JsonNodes
- allow all options to be overridden at the NC level, not just NC options (i.e. common, cc)
- accept controller id from the CC, avoid configuring this on NCs
- update all CCs with metadata bootstrap, not just the primary CC
- remove TxnIdFactory static singleton, management by metadata node
- remove unused build-properties style test configs
- cleanup test iodevices
Change-Id: Iff60887bf71ce3f3ed7201afd9499612bfc83485
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2344
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index a73175a..9c7ca5f 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -34,11 +34,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-transactions</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
index e12c5ca..bc6f1b1 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
@@ -25,7 +25,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
@@ -48,13 +48,15 @@
//Starts running a deployed job specification periodically with an interval of "duration" seconds
public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
- IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId) {
+ IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId,
+ ITxnIdFactory txnIdFactory) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
- if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId)) {
+ if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId,
+ txnIdFactory)) {
scheduledExecutorService.shutdown();
}
} catch (Exception e) {
@@ -67,8 +69,9 @@
}
public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
- Map<byte[], byte[]> jobParameters, long duration, EntityId entityId) throws Exception {
- long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId);
+ Map<byte[], byte[]> jobParameters, long duration, EntityId entityId, ITxnIdFactory txnIdFactory)
+ throws Exception {
+ long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory);
if (executionMilliseconds > duration && LOGGER.isErrorEnabled()) {
LOGGER.log(Level.ERROR,
"Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
@@ -81,12 +84,12 @@
}
public synchronized static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
- Map<byte[], byte[]> jobParameters, EntityId entityId) throws Exception {
+ Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory) throws Exception {
JobId jobId;
long startTime = Instant.now().toEpochMilli();
//Add the Asterix Transaction Id to the map
- jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(TxnIdFactory.create().getId()).getBytes());
+ jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(txnIdFactory.create().getId()).getBytes());
jobId = hcc.startJob(distributedId, jobParameters);
hcc.waitForCompletion(jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index a18277e..189a7e1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -65,7 +65,6 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.optimizer.base.FuzzyUtils;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
@@ -220,7 +219,7 @@
printPlanPostfix(output);
}
- final TxnId txnId = TxnIdFactory.create();
+ final TxnId txnId = metadataProvider.getTxnIdFactory().create();
metadataProvider.setTxnId(txnId);
ILangExpressionToPlanTranslator t =
translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 366438a..1de6938 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -25,6 +25,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -77,6 +79,7 @@
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -414,20 +417,21 @@
@Override
public void initializeMetadata(boolean newUniverse) throws Exception {
- IAsterixStateProxy proxy;
LOGGER.info("Bootstrapping metadata");
MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(),
ncExtensionManager.getMetadataExtensions());
- proxy = (IAsterixStateProxy) getServiceContext().getDistributedState();
- if (proxy == null) {
+ //noinspection unchecked
+ ConcurrentHashMap<CcId, IAsterixStateProxy> proxyMap =
+ (ConcurrentHashMap<CcId, IAsterixStateProxy>) getServiceContext().getDistributedState();
+ if (proxyMap == null) {
throw new IllegalStateException("Metadata node cannot access distributed state");
}
// This is a special case, we just give the metadataNode directly.
// This way we can delay the registration of the metadataNode until
// it is completely initialized.
- MetadataManager.initialize(proxy, MetadataNode.INSTANCE);
+ MetadataManager.initialize(proxyMap.values(), MetadataNode.INSTANCE);
MetadataBootstrap.startUniverse(getServiceContext(), newUniverse);
MetadataBootstrap.startDDLRecovery();
ncExtensionManager.initializeMetadata(getServiceContext());
@@ -440,7 +444,6 @@
metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
getMetadataProperties().getMetadataPort());
}
- ((IAsterixStateProxy) getServiceContext().getDistributedState()).setMetadataNode(metadataNodeStub);
}
@Override
@@ -451,6 +454,17 @@
metadataNodeStub = null;
}
+ @Override
+ public synchronized void bindMetadataNodeStub(CcId ccId) throws RemoteException {
+ if (metadataNodeStub == null) {
+ throw new IllegalStateException("Metadata node not exported");
+
+ }
+ //noinspection unchecked
+ ((ConcurrentMap<CcId, IAsterixStateProxy>) getServiceContext().getDistributedState()).get(ccId)
+ .setMetadataNode(metadataNodeStub);
+ }
+
public NCExtensionManager getNcExtensionManager() {
return ncExtensionManager;
}
@@ -472,7 +486,9 @@
if (hcc == null || !hcc.isConnected()) {
try {
NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService();
- ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
+ // TODO(mblow): multicc
+ CcId primaryCcId = ncSrv.getPrimaryCcId();
+ ClusterControllerInfo ccInfo = ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo();
hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
} catch (Exception e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
index e41bc60..701cb96 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -27,21 +27,12 @@
public class BindMetadataNodeTask implements INCLifecycleTask {
private static final long serialVersionUID = 1L;
- private final boolean exportStub;
-
- public BindMetadataNodeTask(boolean exportStub) {
- this.exportStub = exportStub;
- }
@Override
public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
- if (exportStub) {
- appContext.exportMetadataNodeStub();
- } else {
- appContext.unexportMetadataNodeStub();
- }
+ appContext.bindMetadataNodeStub(ccId);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -49,6 +40,6 @@
@Override
public String toString() {
- return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"export-stub\" : " + exportStub + " }";
+ return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExportMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExportMetadataNodeTask.java
new file mode 100644
index 0000000..c833850
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExportMetadataNodeTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class ExportMetadataNodeTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final boolean exportStub;
+
+ public ExportMetadataNodeTask(boolean exportStub) {
+ this.exportStub = exportStub;
+ }
+
+ @Override
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
+ INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
+ try {
+ if (exportStub) {
+ appContext.exportMetadataNodeStub();
+ } else {
+ appContext.unexportMetadataNodeStub();
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"export-stub\" : " + exportStub + " }";
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 980375d..8939059 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -27,6 +27,7 @@
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
@@ -157,21 +158,20 @@
tasks.add(new MetadataBootstrapTask());
}
tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
- tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isMetadataNode) {
- tasks.add(new BindMetadataNodeTask(true));
+ tasks.add(new ExportMetadataNodeTask(true));
+ tasks.add(new BindMetadataNodeTask());
}
+ tasks.add(new ReportLocalCountersTask());
return tasks;
}
protected List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
if (metadataNode) {
- // need to unbind from old distributed state then rebind to new one
- tasks.add(new BindMetadataNodeTask(false));
- tasks.add(new BindMetadataNodeTask(true));
+ tasks.add(new BindMetadataNodeTask());
}
tasks.add(new ReportLocalCountersTask());
return tasks;
@@ -182,7 +182,7 @@
if (metadataNodeId.equals(node)) {
return;
}
- // if current metadata node is active, we need to unbind its metadata proxy object
+ // if current metadata node is active, we need to unbind its metadata proxy objects
if (clusterManager.isMetadataNodeActive()) {
MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false);
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
index a8c98c7..4443825 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
@@ -47,6 +47,7 @@
if (export) {
appContext.initializeMetadata(false);
appContext.exportMetadataNodeStub();
+ appContext.bindMetadataNodeStub(getCcId());
} else {
appContext.unexportMetadataNodeStub();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 1cefd42..551e6aa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -152,13 +152,13 @@
appCtx.setExtensionManager(ccExtensionManager);
final CCConfig ccConfig = controllerService.getCCConfig();
if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname", ccConfig.getClusterListenAddress());
+ System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress());
}
MetadataProperties metadataProperties = appCtx.getMetadataProperties();
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort()));
ccServiceCtx.setDistributedState(proxy);
- MetadataManager.initialize(proxy, metadataProperties);
+ MetadataManager.initialize(proxy, metadataProperties, appCtx);
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
// create event loop groups
@@ -178,7 +178,7 @@
throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager());
+ new MetadataLockManager(), MetadataManager::getTxnIdBlockFactory);
}
protected GlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index a23a763..1220462 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -199,22 +199,20 @@
&& (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
state = SystemState.BOOTSTRAPPING;
}
- // Request registration tasks from CC
- // TODO (mblow): multicc
+ // Request registration tasks from CC (we only do this from our primary CC, in the case of multiple CCs)
final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService();
- RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryClusterController().getCcId(),
- ncControllerService, NodeStatus.BOOTING, state);
+ RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryCcId(), ncControllerService,
+ NodeStatus.BOOTING, state);
startupCompleted = true;
}
@Override
public void onRegisterNode(CcId ccId) throws Exception {
- // TODO (mblow): multicc
- if (startupCompleted && ccId.equals(((NodeControllerService) ncServiceCtx.getControllerService())
- .getPrimaryClusterController().getCcId())) {
+ if (startupCompleted) {
/*
* If the node completed its startup before, then this is a re-registration with
- * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE
+ * the primary (or supplemental) CC and therefore the system state should be HEALTHY and the node status
+ * is ACTIVE
*/
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
NodeStatus.ACTIVE, SystemState.HEALTHY);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 22458d3..7a74940 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -75,7 +75,7 @@
@Override
public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception {
- sendMessageToCC(ncs.getPrimaryClusterController().getCcId(), message);
+ sendMessageToCC(ncs.getPrimaryCcId(), message);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index f446c4b..c37d8cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -26,7 +26,6 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -60,7 +59,7 @@
AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
- TxnId txnId = TxnIdFactory.create();
+ TxnId txnId = metadataProvider.getTxnIdFactory().create();
FlushDatasetOperatorDescriptor flushOperator =
new FlushDatasetOperatorDescriptor(spec, txnId, dataset.getDatasetId());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 80275a5..7bb917f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -47,7 +47,6 @@
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.rebalance.IDatasetRebalanceCallback;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -284,7 +283,7 @@
private static void populateDataToRebalanceTarget(Dataset source, Dataset target, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
JobSpecification spec = new JobSpecification();
- TxnId txnId = TxnIdFactory.create();
+ TxnId txnId = metadataProvider.getTxnIdFactory().create();
JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true);
spec.setJobletEventListenerFactory(jobEventListenerFactory);
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
deleted file mode 100644
index 7eba9eb..0000000
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<asterixConfiguration xmlns="asterixconf">
- <metadataNode>asterix_nc1</metadataNode>
- <store>
- <ncId>asterix_nc1</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <store>
- <ncId>asterix_nc2</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <transactionLogDir>
- <ncId>asterix_nc1</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
- </transactionLogDir>
- <transactionLogDir>
- <ncId>asterix_nc2</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
- </transactionLogDir>
-
- <property>
- <name>max.wait.active.cluster</name>
- <value>60</value>
- <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
- nodes are available)
- before a submitted query/statement can be
- executed. (Default = 60 seconds)
- </description>
- </property>
-
- <property>
- <name>compiler.framesize</name>
- <value>32KB</value>
- </property>
- <property>
- <name>compiler.sortmemory</name>
- <value>320KB</value>
- </property>
- <property>
- <name>compiler.groupmemory</name>
- <value>160KB</value>
- </property>
- <property>
- <name>compiler.joinmemory</name>
- <value>256KB</value>
- </property>
- <property>
- <name>storage.buffercache.pagesize</name>
- <value>32KB</value>
- <description>The page size in bytes for pages in the buffer cache.
- (Default = "128KB")
- </description>
- </property>
- <property>
- <name>storage.buffercache.size</name>
- <value>48MB</value>
- <description>The size of memory allocated to the disk buffer cache.
- The value should be a multiple of the buffer cache page size.
- </description>
- </property>
- <property>
- <name>storage.memorycomponent.numpages</name>
- <value>16</value>
- <description>The number of pages to allocate for a memory component.
- This budget is shared by all the memory components of the primary
- index and all its secondary indexes across all I/O devices on a node.
- Note: in-memory components usually has fill factor of 75% since
- the pages are 75% full and the remaining 25% is un-utilized.
- </description>
- </property>
- <property>
- <name>storage.memorycomponent.globalbudget</name>
- <value>512MB</value>
- <description>The size of memory allocated to the memory components.
- The value should be a multiple of the memory component page size.
- </description>
- </property>
- <property>
- <name>messaging.frame.size</name>
- <value>4096</value>
- <description>The frame size to be used for NC to NC messaging. (Default = 4kb)
- </description>
- </property>
- <property>
- <name>messaging.frame.count</name>
- <value>512</value>
- <description>Number of reusable frames for NC to NC messaging. (Default = 512)
- </description>
- </property>
- <property>
- <name>log.level</name>
- <value>INFO</value>
- <description>foo</description>
- </property>
-</asterixConfiguration>
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
deleted file mode 100644
index af17901..0000000
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<asterixConfiguration xmlns="asterixconf">
- <metadataNode>asterix_nc1</metadataNode>
- <store>
- <ncId>asterix_nc1</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <store>
- <ncId>asterix_nc2</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <transactionLogDir>
- <ncId>asterix_nc1</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
- </transactionLogDir>
- <transactionLogDir>
- <ncId>asterix_nc2</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
- </transactionLogDir>
-
- <property>
- <name>max.wait.active.cluster</name>
- <value>60</value>
- <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
- nodes are available)
- before a submitted query/statement can be
- executed. (Default = 60 seconds)
- </description>
- </property>
-
- <property>
- <name>compiler.framesize</name>
- <value>32KB</value>
- </property>
- <property>
- <name>compiler.sortmemory</name>
- <value>320KB</value>
- </property>
- <property>
- <name>compiler.groupmemory</name>
- <value>160KB</value>
- </property>
- <property>
- <name>compiler.joinmemory</name>
- <value>256KB</value>
- </property>
- <property>
- <name>compiler.parallelism</name>
- <value>-1</value>
- </property>
- <property>
- <name>storage.buffercache.pagesize</name>
- <value>32KB</value>
- <description>The page size in bytes for pages in the buffer cache.
- (Default = "128KB")
- </description>
- </property>
- <property>
- <name>storage.buffercache.size</name>
- <value>48MB</value>
- <description>The size of memory allocated to the disk buffer cache.
- The value should be a multiple of the buffer cache page size.
- </description>
- </property>
- <property>
- <name>storage.memorycomponent.numpages</name>
- <value>8</value>
- <description>The number of pages to allocate for a memory component.
- This budget is shared by all the memory components of the primary
- index and all its secondary indexes across all I/O devices on a node.
- Note: in-memory components usually has fill factor of 75% since
- the pages are 75% full and the remaining 25% is un-utilized.
- </description>
- </property>
- <property>
- <name>storage.memorycomponent.globalbudget</name>
- <value>512MB</value>
- <description>The size of memory allocated to the memory components.
- The value should be a multiple of the memory component page size.
- </description>
- </property>
- <property>
- <name>messaging.frame.size</name>
- <value>4096</value>
- <description>The frame size to be used for NC to NC messaging. (Default = 4kb)
- </description>
- </property>
- <property>
- <name>messaging.frame.count</name>
- <value>512</value>
- <description>Number of reusable frames for NC to NC messaging. (Default = 512)
- </description>
- </property>
-</asterixConfiguration>
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
deleted file mode 100644
index 06ac7b5..0000000
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<asterixConfiguration xmlns="asterixconf">
- <metadataNode>asterix_nc1</metadataNode>
- <store>
- <ncId>asterix_nc1</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <store>
- <ncId>asterix_nc2</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <transactionLogDir>
- <ncId>asterix_nc1</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
- </transactionLogDir>
- <transactionLogDir>
- <ncId>asterix_nc2</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
- </transactionLogDir>
-
- <property>
- <name>max.wait.active.cluster</name>
- <value>60</value>
- <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
- nodes are available)
- before a submitted query/statement can be
- executed. (Default = 60 seconds)
- </description>
- </property>
-
- <property>
- <name>compiler.framesize</name>
- <value>32KB</value>
- </property>
- <property>
- <name>compiler.sortmemory</name>
- <value>320KB</value>
- </property>
- <property>
- <name>compiler.groupmemory</name>
- <value>160KB</value>
- </property>
- <property>
- <name>compiler.joinmemory</name>
- <value>256KB</value>
- </property>
- <property>
- <name>compiler.parallelism</name>
- <value>3</value>
- </property>
- <property>
- <name>storage.buffercache.pagesize</name>
- <value>32KB</value>
- <description>The page size in bytes for pages in the buffer cache.
- (Default = "128KB")
- </description>
- </property>
- <property>
- <name>storage.buffercache.size</name>
- <value>48MB</value>
- <description>The size of memory allocated to the disk buffer cache.
- The value should be a multiple of the buffer cache page size.
- </description>
- </property>
- <property>
- <name>storage.memorycomponent.numpages</name>
- <value>8</value>
- <description>The number of pages to allocate for a memory component.
- This budget is shared by all the memory components of the primary
- index and all its secondary indexes across all I/O devices on a node.
- Note: in-memory components usually has fill factor of 75% since
- the pages are 75% full and the remaining 25% is un-utilized.
- </description>
- </property>
- <property>
- <name>storage.memorycomponent.globalbudget</name>
- <value>512MB</value>
- <description>The size of memory allocated to the memory components.
- The value should be a multiple of the memory component page size.
- </description>
- </property>
- <property>
- <name>messaging.frame.size</name>
- <value>4096</value>
- <description>The frame size to be used for NC to NC messaging. (Default = 4kb)
- </description>
- </property>
- <property>
- <name>messaging.frame.count</name>
- <value>512</value>
- <description>Number of reusable frames for NC to NC messaging. (Default = 512)
- </description>
- </property>
-</asterixConfiguration>
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration4.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration4.xml
deleted file mode 100644
index bfa51dd..0000000
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration4.xml
+++ /dev/null
@@ -1,87 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<asterixConfiguration xmlns="asterixconf">
- <metadataNode>asterix_nc1</metadataNode>
- <store>
- <ncId>asterix_nc1</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <store>
- <ncId>asterix_nc2</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <transactionLogDir>
- <ncId>asterix_nc1</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
- </transactionLogDir>
- <transactionLogDir>
- <ncId>asterix_nc2</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
- </transactionLogDir>
-
- <property>
- <name>max.wait.active.cluster</name>
- <value>60</value>
- <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
- nodes are available)
- before a submitted query/statement can be
- executed. (Default = 60 seconds)
- </description>
- </property>
-
- <property>
- <name>compiler.framesize</name>
- <value>32KB</value>
- </property>
- <property>
- <name>compiler.sortmemory</name>
- <value>320KB</value>
- </property>
- <property>
- <name>compiler.groupmemory</name>
- <value>160KB</value>
- </property>
- <property>
- <name>compiler.joinmemory</name>
- <value>256KB</value>
- </property>
- <property>
- <name>compiler.parallelism</name>
- <value>-1</value>
- </property>
- <property>
- <name>storage.buffercache.pagesize</name>
- <value>32KB</value>
- <description>The page size in bytes for pages in the buffer cache.
- (Default = "128KB")
- </description>
- </property>
- <property>
- <name>messaging.frame.size</name>
- <value>4096</value>
- <description>The frame size to be used for NC to NC messaging. (Default = 4kb)
- </description>
- </property>
- <property>
- <name>messaging.frame.count</name>
- <value>512</value>
- <description>Number of reusable frames for NC to NC messaging. (Default = 512)
- </description>
- </property>
-</asterixConfiguration>
diff --git a/asterixdb/asterix-app/src/main/resources/cc-rep.conf b/asterixdb/asterix-app/src/main/resources/cc-rep.conf
index 885201f..1f4e5a5 100644
--- a/asterixdb/asterix-app/src/main/resources/cc-rep.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc-rep.conf
@@ -18,7 +18,7 @@
[nc/asterix_nc1]
txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog
core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump
-iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
replication.listen.port=2001
nc.api.port=19004
@@ -27,7 +27,7 @@
ncservice.port=9091
txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog
core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump
-iodevices=../asterix-server/target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
replication.listen.port=2002
nc.api.port=19005
diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf
index c5a3fdb..914b7b6 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -18,7 +18,7 @@
[nc/asterix_nc1]
txn.log.dir=target/tmp/asterix_nc1/txnlog
core.dump.dir=target/tmp/asterix_nc1/coredump
-iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2
nc.api.port=19004
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
@@ -26,7 +26,7 @@
ncservice.port=9091
txn.log.dir=target/tmp/asterix_nc2/txnlog
core.dump.dir=target/tmp/asterix_nc2/coredump
-iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2
nc.api.port=19005
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
diff --git a/asterixdb/asterix-app/src/main/resources/cc2.conf b/asterixdb/asterix-app/src/main/resources/cc2.conf
index 941b6c1..6c01386 100644
--- a/asterixdb/asterix-app/src/main/resources/cc2.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc2.conf
@@ -18,7 +18,7 @@
[nc/asterix_nc1]
txn.log.dir=target/tmp/asterix_nc1/txnlog
core.dump.dir=target/tmp/asterix_nc1/coredump
-iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2
nc.api.port=19004
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
@@ -26,7 +26,7 @@
ncservice.port=9091
txn.log.dir=target/tmp/asterix_nc2/txnlog
core.dump.dir=target/tmp/asterix_nc2/coredump
-iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2
nc.api.port=19005
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
diff --git a/asterixdb/asterix-app/src/main/resources/cc3.conf b/asterixdb/asterix-app/src/main/resources/cc3.conf
index 01383ef..933e4af 100644
--- a/asterixdb/asterix-app/src/main/resources/cc3.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc3.conf
@@ -18,7 +18,7 @@
[nc/asterix_nc1]
txn.log.dir=target/tmp/asterix_nc1/txnlog
core.dump.dir=target/tmp/asterix_nc1/coredump
-iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2
nc.api.port=19004
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
@@ -26,7 +26,7 @@
ncservice.port=9091
txn.log.dir=target/tmp/asterix_nc2/txnlog
core.dump.dir=target/tmp/asterix_nc2/coredump
-iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2
nc.api.port=19005
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
diff --git a/asterixdb/asterix-app/src/main/resources/cc4.conf b/asterixdb/asterix-app/src/main/resources/cc4.conf
index bcbf6b1..3b7a993 100644
--- a/asterixdb/asterix-app/src/main/resources/cc4.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc4.conf
@@ -18,7 +18,7 @@
[nc/asterix_nc1]
txn.log.dir=target/tmp/asterix_nc1/txnlog
core.dump.dir=target/tmp/asterix_nc1/coredump
-iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2
nc.api.port=19004
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
@@ -26,7 +26,7 @@
ncservice.port=9091
txn.log.dir=target/tmp/asterix_nc2/txnlog
core.dump.dir=target/tmp/asterix_nc2/coredump
-iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2
nc.api.port=19005
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml
deleted file mode 100644
index 41be696..0000000
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ /dev/null
@@ -1,56 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<cluster xmlns="cluster">
- <instance_name>asterix</instance_name>
- <store>storage</store>
- <metadata_node>nc1</metadata_node>
-
- <high_availability>
- <enabled>false</enabled>
- <data_replication>
- <strategy>metadata_only</strategy>
- <replication_port>2016</replication_port>
- <replication_time_out>30</replication_time_out>
- </data_replication>
- <fault_tolerance>
- <strategy>metadata_node</strategy>
- </fault_tolerance>
- </high_availability>
-
- <master_node>
- <id>master</id>
- <client_ip>127.0.0.1</client_ip>
- <cluster_ip>127.0.0.1</cluster_ip>
- <client_port>1098</client_port>
- <cluster_port>1099</cluster_port>
- <http_port>8888</http_port>
- </master_node>
- <node>
- <id>nc1</id>
- <cluster_ip>127.0.0.1</cluster_ip>
- <replication_port>2016</replication_port>
- <nc_api_port>19004</nc_api_port>
- </node>
- <node>
- <id>nc2</id>
- <cluster_ip>127.0.0.1</cluster_ip>
- <replication_port>2017</replication_port>
- <nc_api_port>19005</nc_api_port>
- </node>
-</cluster>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/resources/log4j2.xml b/asterixdb/asterix-app/src/main/resources/log4j2.xml
index 24bcf77..1debf82 100644
--- a/asterixdb/asterix-app/src/main/resources/log4j2.xml
+++ b/asterixdb/asterix-app/src/main/resources/log4j2.xml
@@ -26,5 +26,6 @@
<Root level="WARN">
<AppenderRef ref="Console"/>
</Root>
+ <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/>
</Loggers>
</Configuration>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
index 387e900..c6ecd7d 100644
--- a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
+++ b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
@@ -32,19 +32,14 @@
<Root level="WARN">
<AppenderRef ref="InfoLog"/>
</Root>
- <Logger name="org.apache.hyracks" level="INFO" additivity="false">
- <AppenderRef ref="InfoLog"/>
- </Logger>
- <Logger name="org.apache.asterix" level="INFO" additivity="false">
- <AppenderRef ref="InfoLog"/>
- </Logger>
- <Logger name="org.apache.hyracks.test" level="INFO" additivity="false">
+ <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/>
+ <Logger name="org.apache.hyracks" level="INFO"/>
+ <Logger name="org.apache.asterix" level="INFO"/>
+ <Logger name="org.apache.hyracks.test" level="INFO">
<AppenderRef ref="ConsoleTest"/>
- <AppenderRef ref="InfoLog"/>
</Logger>
- <Logger name="org.apache.asterix.test" level="INFO" additivity="false">
+ <Logger name="org.apache.asterix.test" level="INFO">
<AppenderRef ref="ConsoleTest"/>
- <AppenderRef ref="InfoLog"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index fffc170..a02bda5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -105,6 +106,13 @@
void unexportMetadataNodeStub() throws RemoteException;
/**
+ * Binds the exported metadata node to the CC's distributed state.
+ *
+ * @throws RemoteException
+ */
+ void bindMetadataNodeStub(CcId ccId) throws RemoteException;
+
+ /**
* @return instance of {@link org.apache.asterix.common.context.IStorageComponentProvider}
*/
IStorageComponentProvider getStorageComponentProvider();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index e02482d..18e3327 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ExtensionProperties;
@@ -121,4 +122,9 @@
* @return the node job tracker
*/
INodeJobTracker getNodeJobTracker();
+
+ /**
+ * @return the transaction id factory
+ */
+ ITxnIdFactory getTxnIdFactory();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILongBlockFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILongBlockFactory.java
new file mode 100644
index 0000000..b5ee0a8
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILongBlockFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface ILongBlockFactory {
+ /**
+ * Ensures future blocks are allocated larger than the supplied value
+ *
+ * @param value
+ * the value to ensure future blocks are larger than
+ */
+ void ensureMinimum(long value) throws AlgebricksException;
+
+ /**
+ * Allocates a block of longs of specified block size
+ *
+ * @param blockSize
+ * The size of the block of longs to reserve
+ * @return the start of the reserved block
+ */
+ long getBlock(int blockSize) throws AlgebricksException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdBlockProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdBlockProvider.java
new file mode 100644
index 0000000..94ca848
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdBlockProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.transactions;
+
+import java.io.Serializable;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+
+public interface ITxnIdBlockProvider extends Remote, Serializable {
+
+ /**
+ * Ensures that future transaction blocks will be of a value larger than the supplied value
+ *
+ * @param maxId
+ * The txn id to ensure future txn ids are larger than
+ * @throws RemoteException
+ */
+ void ensureMinimumTxnId(long maxId) throws RemoteException;
+
+ /**
+ * Allocates a block of transaction ids of specified block size
+ *
+ * @param blockSize
+ * The size of the transaction id block to reserve
+ * @return the start of the reserved block
+ * @throws RemoteException
+ */
+ long reserveTxnIdBlock(int blockSize) throws RemoteException;
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
new file mode 100644
index 0000000..3c60432
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface ITxnIdFactory {
+ /**
+ * Creates a new unique transaction id. The implementation must ensure this id is unique within the cluster
+ *
+ * @return the new transaction id
+ */
+ TxnId create() throws AlgebricksException;
+
+ /**
+ * Ensure that future transaction ids are larger than the supplied id
+ *
+ * @param id
+ * the value to ensure future created transaction ids are larger than
+ */
+ void ensureMinimumId(long id) throws AlgebricksException;
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
new file mode 100644
index 0000000..8ac6b63
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.common.transactions.TxnId;
+
+class BulkTxnIdFactory implements ITxnIdFactory {
+
+ private final AtomicLong maxId = new AtomicLong();
+
+ @Override
+ public TxnId create() {
+ return new TxnId(maxId.incrementAndGet());
+ }
+
+ public long reserveIdBlock(int blockSize) {
+ if (blockSize < 1) {
+ throw new IllegalArgumentException("block size cannot be smaller than 1, but was " + blockSize);
+ }
+ return maxId.getAndAdd(blockSize) + 1;
+ }
+
+ @Override
+ public void ensureMinimumId(long id) {
+ this.maxId.getAndUpdate(next -> Math.max(next, id));
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 8578d6b..b4b304e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -21,17 +21,21 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.transactions.ILongBlockFactory;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -53,7 +57,6 @@
import org.apache.asterix.metadata.entities.Node;
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -89,9 +92,9 @@
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
-public class MetadataManager implements IMetadataManager {
+public abstract class MetadataManager implements IMetadataManager, ILongBlockFactory {
private final MetadataCache cache = new MetadataCache();
- protected final IAsterixStateProxy proxy;
+ protected final Collection<IAsterixStateProxy> proxies;
protected IMetadataNode metadataNode;
private final ReadWriteLock metadataLatch;
protected boolean rebindMetadataNode = false;
@@ -100,19 +103,19 @@
// update field name accordingly
public static IMetadataManager INSTANCE;
- private MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) {
- this(proxy);
+ private MetadataManager(Collection<IAsterixStateProxy> proxies, MetadataNode metadataNode) {
+ this(proxies);
if (metadataNode == null) {
throw new IllegalArgumentException("Null metadataNode given to MetadataManager");
}
this.metadataNode = metadataNode;
}
- private MetadataManager(IAsterixStateProxy proxy) {
- if (proxy == null) {
- throw new IllegalArgumentException("Null proxy given to MetadataManager");
+ private MetadataManager(Collection<IAsterixStateProxy> proxies) {
+ if (proxies == null || proxies.isEmpty()) {
+ throw new IllegalArgumentException("Null / empty list of proxies given to MetadataManager");
}
- this.proxy = proxy;
+ this.proxies = proxies;
this.metadataLatch = new ReentrantReadWriteLock(true);
}
@@ -122,11 +125,7 @@
}
@Override
- public MetadataTransactionContext beginTransaction() throws RemoteException, ACIDException {
- TxnId txnId = TxnIdFactory.create();
- metadataNode.beginTransaction(txnId);
- return new MetadataTransactionContext(txnId);
- }
+ public abstract MetadataTransactionContext beginTransaction() throws RemoteException, ACIDException;
@Override
public void commitTransaction(MetadataTransactionContext ctx) throws RemoteException, ACIDException {
@@ -998,20 +997,64 @@
rebindMetadataNode = true;
}
- public static void initialize(IAsterixStateProxy proxy, MetadataProperties metadataProperties) {
- INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties);
+ @Override
+ public void ensureMinimum(long value) throws AlgebricksException {
+ try {
+ metadataNode.ensureMinimumTxnId(value);
+ } catch (RemoteException e) {
+ throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+ }
}
- public static void initialize(IAsterixStateProxy proxy, MetadataNode metadataNode) {
- INSTANCE = new MetadataManager(proxy, metadataNode);
+ @Override
+ public long getBlock(int blockSize) throws AlgebricksException {
+ try {
+ return metadataNode.reserveTxnIdBlock(blockSize);
+ } catch (RemoteException e) {
+ throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+ }
+ }
+
+ public static ILongBlockFactory getTxnIdBlockFactory() {
+ try {
+ INSTANCE.init();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ return (ILongBlockFactory) INSTANCE;
+
+ }
+
+ public static void initialize(IAsterixStateProxy proxy, MetadataProperties metadataProperties,
+ ICcApplicationContext appCtx) {
+ INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties, appCtx);
+ }
+
+ public static void initialize(Collection<IAsterixStateProxy> proxies, MetadataNode metadataNode) {
+ INSTANCE = new NCMetadataManagerImpl(proxies, metadataNode);
}
private static class CCMetadataManagerImpl extends MetadataManager {
private final MetadataProperties metadataProperties;
+ private final ICcApplicationContext appCtx;
- public CCMetadataManagerImpl(IAsterixStateProxy proxy, MetadataProperties metadataProperties) {
- super(proxy);
+ CCMetadataManagerImpl(IAsterixStateProxy proxy, MetadataProperties metadataProperties,
+ ICcApplicationContext appCtx) {
+ super(Collections.singleton(proxy));
this.metadataProperties = metadataProperties;
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ public MetadataTransactionContext beginTransaction() throws RemoteException {
+ TxnId txnId;
+ try {
+ txnId = appCtx.getTxnIdFactory().create();
+ } catch (AlgebricksException e) {
+ throw new ACIDException(e);
+ }
+ metadataNode.beginTransaction(txnId);
+ return new MetadataTransactionContext(txnId);
}
@Override
@@ -1020,8 +1063,8 @@
return;
}
try {
- metadataNode =
- proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), TimeUnit.SECONDS);
+ metadataNode = proxies.iterator().next()
+ .waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), TimeUnit.SECONDS);
if (metadataNode != null) {
rebindMetadataNode = false;
} else {
@@ -1038,4 +1081,17 @@
super.init();
}
}
+
+ private static class NCMetadataManagerImpl extends MetadataManager {
+ NCMetadataManagerImpl(Collection<IAsterixStateProxy> proxies, MetadataNode metadataNode) {
+ super(proxies, metadataNode);
+ }
+
+ @Override
+ public MetadataTransactionContext beginTransaction() throws RemoteException {
+ TxnId txnId = new TxnId(metadataNode.reserveTxnIdBlock(1));
+ metadataNode.beginTransaction(txnId);
+ return new MetadataTransactionContext(txnId);
+ }
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index e8f2595..72d5cf5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -132,6 +132,7 @@
private IDatasetLifecycleManager datasetLifecycleManager;
private ITransactionSubsystem transactionSubsystem;
private int metadataStoragePartition;
+ private transient BulkTxnIdFactory txnIdFactory;
// core only
private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
// extension only
@@ -157,6 +158,17 @@
}
}
}
+ this.txnIdFactory = new BulkTxnIdFactory();
+ }
+
+ @Override
+ public void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException {
+ txnIdFactory.ensureMinimumId(maxId);
+ }
+
+ @Override
+ public long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException {
+ return txnIdFactory.reserveIdBlock(blockSize);
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index cdb27d7..c3f9d7f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.transactions.ITxnIdBlockProvider;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -51,7 +52,28 @@
* lock/access metadata shall always go through the MetadataManager, and should
* never call methods on the MetadataNode directly for any reason.
*/
-public interface IMetadataNode extends Remote, Serializable {
+public interface IMetadataNode extends Remote, Serializable, ITxnIdBlockProvider {
+
+ /**
+ * Allocates a block of transaction ids of specified block size
+ *
+ * @param maxId
+ * The txn id to ensure future txn ids are larger than
+ * @throws ACIDException
+ * @throws RemoteException
+ */
+ void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException;
+
+ /**
+ * Allocates a block of transaction ids of specified block size
+ *
+ * @param blockSize
+ * The size of the transaction id block to reserve
+ * @return the start of the reserved block
+ * @throws ACIDException
+ * @throws RemoteException
+ */
+ long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException;
/**
* Begins a local transaction against the metadata.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 8ab9f82..5357fc8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -58,6 +58,7 @@
@Override
public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws InterruptedException {
synchronized (this) {
+ //TODO(mblow): replace with nanoTime() to avoid being affected by system clock adjustments...
long timeToWait = TimeUnit.MILLISECONDS.convert(waitFor, timeUnit);
while (metadataNode == null && timeToWait > 0) {
long startTime = System.currentTimeMillis();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f740d09..7a24400 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -1539,4 +1540,8 @@
public ICcApplicationContext getApplicationContext() {
return appCtx;
}
+
+ public ITxnIdFactory getTxnIdFactory() {
+ return appCtx.getTxnIdFactory();
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 2ebfe78..eb14bae 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -35,7 +35,6 @@
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -161,8 +160,9 @@
* the metadata provider.
* @return the AsterixDB job id for transaction management.
*/
- public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
- TxnId txnId = TxnIdFactory.create();
+ public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider)
+ throws AlgebricksException {
+ TxnId txnId = metadataProvider.getTxnIdFactory().create();
metadataProvider.setTxnId(txnId);
boolean isWriteTransaction = metadataProvider.isWriteTransaction();
IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, isWriteTransaction);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index 4b95253..db2a044 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -24,9 +24,10 @@
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -37,19 +38,27 @@
private static final Logger LOGGER = LogManager.getLogger();
private final long maxResourceId;
private final long maxTxnId;
+ private final long maxJobId;
private final String src;
- public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId) {
+ public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId, long maxJobId) {
this.src = src;
this.maxResourceId = maxResourceId;
this.maxTxnId = maxTxnId;
+ this.maxJobId = maxJobId;
}
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- TxnIdFactory.ensureMinimumId(maxTxnId);
+ try {
+ appCtx.getTxnIdFactory().ensureMinimumId(maxTxnId);
+ } catch (AlgebricksException e) {
+ throw HyracksDataException.create(e);
+ }
resourceIdManager.report(src, maxResourceId);
+ ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory()
+ .setMaxJobId(maxJobId);
}
public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
@@ -57,8 +66,9 @@
long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+ long maxJobId = ncs.getMaxJobId(ccId);
ReportLocalCountersMessage countersMessage =
- new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId);
+ new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId);
try {
((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage);
} catch (Exception e) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index d8f14a2..b83df6c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,6 +24,8 @@
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ActiveProperties;
@@ -85,12 +87,14 @@
private IMetadataLockManager mdLockManager;
private IClusterStateManager clusterStateManager;
private final INodeJobTracker nodeJobTracker;
+ private final ITxnIdFactory txnIdFactory;
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
- IMetadataLockManager mdLockManager) throws AlgebricksException, IOException {
+ IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory> txnIdBlockSupplier)
+ throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
this.libraryManager = libraryManager;
@@ -118,6 +122,7 @@
clusterStateManager.setCcAppCtx(this);
this.resourceIdManager = new ResourceIdManager(clusterStateManager);
nodeJobTracker = new NodeJobTracker();
+ txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier);
}
@Override
@@ -265,4 +270,8 @@
public ICoordinationService getCoordinationService() {
return NoOpCoordinationService.INSTANCE;
}
+
+ public ITxnIdFactory getTxnIdFactory() {
+ return txnIdFactory;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
new file mode 100644
index 0000000..82bbe6b
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Represents a factory to generate unique transaction IDs.
+ */
+class CcTxnIdFactory implements ITxnIdFactory {
+ private static final int TXN_BLOCK_SIZE = 1024;
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final Supplier<ILongBlockFactory> blockFactorySupplier;
+ private volatile Block block = new Block(0, 0);
+
+ public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) {
+ this.blockFactorySupplier = blockFactorySupplier;
+ }
+
+ @Override
+ public TxnId create() throws AlgebricksException {
+ while (true) {
+ try {
+ return new TxnId(block.nextId());
+ } catch (BlockExhaustedException ex) {
+ // retry
+ LOGGER.info("block exhausted; obtaining new block from supplier");
+ block = new Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE);
+ }
+ }
+ }
+
+ @Override
+ public void ensureMinimumId(long id) throws AlgebricksException {
+ blockFactorySupplier.get().ensureMinimum(id);
+ }
+
+ static class Block {
+ private static final BlockExhaustedException BLOCK_EXHAUSTED_EXCEPTION = new BlockExhaustedException();
+ private final AtomicLong id;
+ private final long start;
+ private final long endExclusive;
+
+ private Block(long start, long blockSize) {
+ this.id = new AtomicLong(start);
+ this.start = start;
+ this.endExclusive = start + blockSize;
+ }
+
+ private long nextId() throws BlockExhaustedException {
+ long nextId = id.incrementAndGet();
+ if (nextId >= endExclusive && (endExclusive >= start || nextId < start)) {
+ throw BLOCK_EXHAUSTED_EXCEPTION;
+ }
+ return nextId;
+ }
+ }
+
+ private static class BlockExhaustedException extends Exception {
+ }
+}
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index 406a762..201945c 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -22,27 +22,28 @@
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;
import javax.xml.transform.sax.SAXSource;
import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
public class TestSuiteParser {
- public TestSuiteParser() {
- }
- public org.apache.asterix.testframework.xml.TestSuite parse(File testSuiteCatalog) throws Exception {
+ public TestSuite parse(File testSuiteCatalog) throws SAXException, JAXBException, ParserConfigurationException {
SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
saxParserFactory.setNamespaceAware(true);
saxParserFactory.setXIncludeAware(true);
SAXParser saxParser = saxParserFactory.newSAXParser();
saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
- JAXBContext ctx = JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
+ JAXBContext ctx = JAXBContext.newInstance(TestSuite.class);
Unmarshaller um = ctx.createUnmarshaller();
- return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal(
+ return (TestSuite) um.unmarshal(
new SAXSource(saxParser.getXMLReader(), new InputSource(testSuiteCatalog.toURI().toString())));
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
deleted file mode 100644
index eb59e74..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.asterix.common.transactions.TxnId;
-
-/**
- * Represents a factory to generate unique transaction IDs.
- */
-public class TxnIdFactory {
-
- private static final AtomicLong id = new AtomicLong();
-
- private TxnIdFactory() {
- }
-
- public static TxnId create() {
- return new TxnId(id.incrementAndGet());
- }
-
- public static void ensureMinimumId(long id) {
- TxnIdFactory.id.updateAndGet(current -> Math.max(current, id));
- }
-}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
index 3d69ddb..0e04dca 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
@@ -18,23 +18,32 @@
*/
package org.apache.hyracks.api.client;
+import org.apache.hyracks.api.control.CcId;
+
import java.io.Serializable;
public class ClusterControllerInfo implements Serializable {
private static final long serialVersionUID = 1L;
+ private final CcId ccId;
+
private final String clientNetAddress;
private final int clientNetPort;
private final int webPort;
- public ClusterControllerInfo(String clientNetAddress, int clientNetPort, int webPort) {
+ public ClusterControllerInfo(CcId ccId, String clientNetAddress, int clientNetPort, int webPort) {
+ this.ccId = ccId;
this.clientNetAddress = clientNetAddress;
this.clientNetPort = clientNetPort;
this.webPort = webPort;
}
+ public CcId getCcId() {
+ return ccId;
+ }
+
public int getWebPort() {
return webPort;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
index d2a254f..aee22c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.api.config;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public interface IOptionType<T> {
@@ -26,6 +27,11 @@
*/
T parse(String s);
+ /**
+ * @throws IllegalArgumentException when the supplied JSON node cannot be interpreted
+ */
+ T parse(JsonNode node);
+
Class<T> targetType();
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
index 32782fd..2a7be9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
@@ -45,6 +45,10 @@
return id;
}
+ public long toLongMask() {
+ return (long) id << CcIdPartitionedLongFactory.ID_BITS;
+ }
+
@Override
public int hashCode() {
return id;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java
new file mode 100644
index 0000000..0a26494
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.control;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CcIdPartitionedLongFactory {
+ private static final int CC_BITS = Short.SIZE;
+ public static final int ID_BITS = Long.SIZE - CC_BITS;
+ public static final long MAX_ID = (1L << ID_BITS) - 1;
+ private final CcId ccId;
+ private final AtomicLong id;
+
+ public CcIdPartitionedLongFactory(CcId ccId) {
+ this.ccId = ccId;
+ id = new AtomicLong(ccId.toLongMask());
+ }
+
+ protected long nextId() {
+ return id.getAndUpdate(prev -> {
+ if ((prev & MAX_ID) == MAX_ID) {
+ return prev ^ MAX_ID;
+ } else {
+ return prev + 1;
+ }
+ });
+ }
+
+ protected long maxId() {
+ long next = id.get();
+ if ((next & MAX_ID) == 0) {
+ return next | MAX_ID;
+ } else {
+ return next - 1;
+ }
+ }
+
+ protected void ensureMinimumId(long id) {
+ if ((id & ~MAX_ID) != ccId.toLongMask()) {
+ throw new IllegalArgumentException("cannot change ccId as part of ensureMinimumId() (was: "
+ + Long.toHexString(this.id.get()) + ", given: " + Long.toHexString(id));
+ }
+ this.id.updateAndGet(current -> Math.max(current, id));
+ }
+
+ public CcId getCcId() {
+ return ccId;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index c83366f..de6b5ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -22,23 +22,24 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.control.CcIdPartitionedLongFactory;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IWritable;
public final class JobId implements IWritable, Serializable, Comparable {
- private static final int CC_BITS = Short.SIZE;
- static final int ID_BITS = Long.SIZE - CC_BITS;
- static final long MAX_ID = (1L << ID_BITS) - 1;
+ private static final Pattern jobIdPattern = Pattern.compile("^JID:(\\d+)\\.(\\d+)$");
public static final JobId INVALID = null;
private static final long serialVersionUID = 1L;
private long id;
- private transient CcId ccId;
+ private transient volatile CcId ccId;
public static JobId create(DataInput dis) throws IOException {
JobId jobId = new JobId();
@@ -59,13 +60,13 @@
public CcId getCcId() {
if (ccId == null) {
- ccId = CcId.valueOf((int) (id >>> ID_BITS));
+ ccId = CcId.valueOf((int) (id >>> CcIdPartitionedLongFactory.ID_BITS));
}
return ccId;
}
public long getIdOnly() {
- return id & MAX_ID;
+ return id & CcIdPartitionedLongFactory.MAX_ID;
}
@Override
@@ -80,13 +81,17 @@
@Override
public String toString() {
- return "JID:" + id;
+ return "JID:" + (id >>> CcIdPartitionedLongFactory.ID_BITS) + "." + getIdOnly();
}
public static JobId parse(String str) throws HyracksDataException {
- if (str.startsWith("JID:")) {
- str = str.substring(4);
- return new JobId(Long.parseLong(str));
+ Matcher m = jobIdPattern.matcher(str);
+ if (m.matches()) {
+ int ccId = Integer.parseInt(m.group(1));
+ if (ccId <= 0xffff && ccId >= 0) {
+ long jobId = Long.parseLong(m.group(2)) | (long) ccId << CcIdPartitionedLongFactory.ID_BITS;
+ return new JobId(jobId);
+ }
}
throw HyracksDataException.create(ErrorCode.NOT_A_JOBID, str);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index 1bb5749..528d35b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -18,36 +18,23 @@
*/
package org.apache.hyracks.api.job;
-import static org.apache.hyracks.api.job.JobId.ID_BITS;
-import static org.apache.hyracks.api.job.JobId.MAX_ID;
-
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.control.CcIdPartitionedLongFactory;
-public class JobIdFactory {
- private final AtomicLong id;
-
+public class JobIdFactory extends CcIdPartitionedLongFactory {
public JobIdFactory(CcId ccId) {
- id = new AtomicLong((long) ccId.shortValue() << ID_BITS);
+ super(ccId);
}
public JobId create() {
- return new JobId(id.getAndUpdate(prev -> {
- if ((prev & MAX_ID) == MAX_ID) {
- return prev ^ MAX_ID;
- } else {
- return prev + 1;
- }
- }));
+ return new JobId(nextId());
}
public JobId maxJobId() {
- long next = id.get();
- if ((next & MAX_ID) == 0) {
- return new JobId(next | MAX_ID);
- } else {
- return new JobId(next - 1);
- }
+ return new JobId(maxId());
+ }
+
+ public void setMaxJobId(long maxJobId) {
+ ensureMinimumId(maxJobId + 1);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java
similarity index 80%
rename from hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java
index 709f098..d2c1d09 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java
@@ -19,23 +19,26 @@
package org.apache.hyracks.api.job;
import java.lang.reflect.Field;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.control.CcIdPartitionedLongFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-public class JobIdFactoryTest {
+public class JobIdTest {
private static Field idField;
@BeforeClass
public static void setup() throws NoSuchFieldException {
- idField = JobIdFactory.class.getDeclaredField("id");
+ idField = CcIdPartitionedLongFactory.class.getDeclaredField("id");
idField.setAccessible(true);
}
@@ -77,7 +80,7 @@
theId.set((((long) 1 << 48) - 1) | expected);
JobId jobId = factory.create();
Assert.assertEquals(ccId, jobId.getCcId());
- Assert.assertEquals(JobId.MAX_ID, jobId.getIdOnly());
+ Assert.assertEquals(CcIdPartitionedLongFactory.MAX_ID, jobId.getIdOnly());
jobId = factory.create();
Assert.assertEquals(ccId, jobId.getCcId());
Assert.assertEquals(0, jobId.getIdOnly());
@@ -115,4 +118,18 @@
} catch (IllegalArgumentException e) {
}
}
+
+ @Test
+ public void testParse() throws HyracksDataException {
+ for (int ccId : Arrays.asList(0xFFFF, 0, (int) Short.MAX_VALUE)) {
+ JobIdFactory factory = new JobIdFactory(CcId.valueOf(ccId));
+ for (int i = 0; i < 1000; i++) {
+ final JobId jobId = factory.create();
+ Assert.assertEquals(jobId.getId(), JobId.parse(jobId.toString()).getId());
+ Assert.assertEquals(jobId, JobId.parse(jobId.toString()));
+ Assert.assertFalse(jobId.toString(), jobId.toString().contains("-"));
+ System.err.println(jobId.toString());
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a6edd70..f8fe77f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -215,7 +215,7 @@
clusterIPC.start();
clientIPC.start();
webServer.start();
- info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
+ info = new ClusterControllerInfo(ccId, ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
webServer.getListeningPort());
timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
jobLog.open();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 26245e1..de166dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -42,7 +42,6 @@
import org.apache.hyracks.control.common.application.ServiceContext;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
-import org.apache.hyracks.control.common.work.IResultCallback;
public class CCServiceContext extends ServiceContext implements ICCServiceContext {
private final ICCContext ccContext;
@@ -50,9 +49,6 @@
protected final Set<String> initPendingNodeIds;
protected final Set<String> deinitPendingNodeIds;
- protected IResultCallback<Object> initializationCallback;
- protected IResultCallback<Object> deinitializationCallback;
-
private List<IJobLifecycleListener> jobLifecycleListeners;
private List<IClusterLifecycleListener> clusterLifecycleListeners;
private final ClusterControllerService ccs;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 77ecbee..96f5f1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -71,6 +71,7 @@
params.setDistributedState(ccs.getContext().getDistributedState());
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
+ params.setRegistrationId(reg.getRegistrationId());
result = new CCNCFunctions.NodeRegistrationResult(params, null);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Node registration failed", e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 9cf84dd..a95ae3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -73,6 +73,4 @@
void getNodeControllerInfos() throws Exception;
void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
-
- CcId getCcId();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 986ca96..fd8c116 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -73,7 +73,9 @@
new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator());
private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class);
@SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
- private Map<String, Map<IOption, Object>> nodeSpecificMap = Collections.synchronizedMap(new TreeMap<>());
+ private Map<String, Map<IOption, Object>> nodeSpecificDefinedMap = Collections.synchronizedMap(new TreeMap<>());
+ @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
+ private Map<String, Map<IOption, Object>> nodeSpecificDefaultMap = Collections.synchronizedMap(new TreeMap<>());
private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>();
private final String[] args;
private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this);
@@ -154,26 +156,28 @@
}
} else {
registeredOptions.add(option);
- optionSetters.put(option,
- (node, value,
- isDefault) -> correctedMap(option.section() == Section.NC ? node : null, isDefault)
- .put(option, value));
+ optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value));
if (LOGGER.isDebugEnabled()) {
- optionSetters.put(option, (node, value, isDefault) -> LOGGER
- .debug((isDefault ? "defaulting" : "setting ") + option.toIniString() + " to " + value));
+ optionSetters.put(option, (node, value, isDefault) -> LOGGER.debug("{} {} to {} for node {}",
+ isDefault ? "defaulting" : "setting", option.toIniString(), value, node));
}
}
}
}
private Map<IOption, Object> correctedMap(String node, boolean isDefault) {
- return node == null ? (isDefault ? defaultMap : definedMap)
- : nodeSpecificMap.computeIfAbsent(node, this::createNodeSpecificMap);
+ if (node == null) {
+ return isDefault ? defaultMap : definedMap;
+ } else {
+ ensureNode(node);
+ return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node);
+ }
}
public void ensureNode(String nodeId) {
LOGGER.debug("ensureNode: " + nodeId);
- nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
}
private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
@@ -352,17 +356,13 @@
private void applyDefaults() {
LOGGER.debug("applying defaults");
sectionMap.forEach((key, value) -> {
- if (key == Section.NC) {
- value.values().forEach(option -> getNodeNames()
- .forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node)));
- for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) {
- value.values()
- .forEach(option -> getOrDefault(
- new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option,
- nodeMap.getKey()));
- }
- } else {
- value.values().forEach(option -> getOrDefault(configurationMap, option, null));
+ value.values().forEach(
+ option -> getNodeNames().forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node)));
+ for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificDefinedMap.entrySet()) {
+ value.values()
+ .forEach(option -> getOrDefault(
+ new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option,
+ nodeMap.getKey()));
}
});
}
@@ -433,17 +433,18 @@
}
public List<String> getNodeNames() {
- return Collections.unmodifiableList(new ArrayList<>(nodeSpecificMap.keySet()));
+ return Collections.unmodifiableList(new ArrayList<>(nodeSpecificDefinedMap.keySet()));
}
public IApplicationConfig getNodeEffectiveConfig(String nodeId) {
- final Map<IOption, Object> nodeMap = nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ ensureNode(nodeId);
+ final Map<IOption, Object> nodeMap = nodeSpecificDefaultMap.get(nodeId);
Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId);
return new ConfigManagerApplicationConfig(this) {
@Override
public Object getStatic(IOption option) {
if (!nodeEffectiveMap.containsKey(option)) {
- // we need to calculate the default the the context of the node specific map...
+ // we need to calculate the default within the context of the node specific map...
nodeMap.put(option, getOrDefault(nodeEffectiveMap, option, nodeId));
}
return nodeEffectiveMap.get(option);
@@ -451,8 +452,14 @@
};
}
- private CompositeMap<IOption, Object> getNodeEffectiveMap(String nodeId) {
- return new CompositeMap<>(nodeSpecificMap.get(nodeId), definedMap, new NoOpMapMutator());
+ private Map<IOption, Object> getNodeEffectiveMap(String nodeId) {
+ ensureNode(nodeId);
+ CompositeMap<IOption, Object> nodeEffectiveMap = new CompositeMap<>();
+ nodeEffectiveMap.setMutator(new NoOpMapMutator());
+ nodeEffectiveMap.addComposited(nodeSpecificDefinedMap.get(nodeId));
+ nodeEffectiveMap.addComposited(nodeSpecificDefaultMap.get(nodeId));
+ nodeEffectiveMap.addComposited(definedMap);
+ return nodeEffectiveMap;
}
public Ini toIni(boolean includeDefaults) {
@@ -462,8 +469,11 @@
ini.add(option.section().sectionName(), option.ini(), option.type().serializeToIni(value));
}
});
- nodeSpecificMap.forEach((key, nodeValueMap) -> {
+ for (String key : getNodeNames()) {
String section = Section.NC.sectionName() + "/" + key;
+ ensureNode(key);
+ Map<IOption, Object> nodeValueMap =
+ includeDefaults ? getNodeEffectiveMap(key) : nodeSpecificDefinedMap.get(key);
synchronized (nodeValueMap) {
for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) {
if (entry.getValue() != null) {
@@ -472,10 +482,9 @@
}
}
}
- });
- extensionOptions.forEach((extension, options) -> {
- options.forEach(option -> ini.add(extension, option.getKey(), option.getValue()));
- });
+ }
+ extensionOptions.forEach((extension, options) -> options
+ .forEach(option -> ini.add(extension, option.getKey(), option.getValue())));
return ini;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 3807a00..b188548 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -19,12 +19,16 @@
package org.apache.hyracks.control.common.config;
import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.util.StorageUtil;
import org.apache.logging.log4j.Level;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class OptionTypes {
@@ -43,6 +47,11 @@
}
@Override
+ public Integer parse(JsonNode node) {
+ return node.isNull() ? null : parse(node.asText());
+ }
+
+ @Override
public Class<Integer> targetType() {
return Integer.class;
}
@@ -65,6 +74,11 @@
}
@Override
+ public Long parse(JsonNode node) {
+ return node.isNull() ? null : parse(node.asText());
+ }
+
+ @Override
public Class<Long> targetType() {
return Long.class;
}
@@ -84,13 +98,22 @@
@Override
public Short parse(String s) {
int value = Integer.decode(s);
- if (Integer.highestOneBit(value) > 16) {
- throw new IllegalArgumentException("The given value " + s + " is too big for a short");
+ return validateShort(value);
+ }
+
+ private Short validateShort(int value) {
+ if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+ throw new IllegalArgumentException("The given value " + value + " does not fit in a short");
}
return (short) value;
}
@Override
+ public Short parse(JsonNode node) {
+ return node.isNull() ? null : validateShort(node.asInt());
+ }
+
+ @Override
public Class<Short> targetType() {
return Short.class;
}
@@ -108,6 +131,11 @@
}
@Override
+ public Integer parse(JsonNode node) {
+ return node.isNull() ? null : node.asInt();
+ }
+
+ @Override
public Class<Integer> targetType() {
return Integer.class;
}
@@ -125,6 +153,11 @@
}
@Override
+ public Double parse(JsonNode node) {
+ return node.isNull() ? null : node.asDouble();
+ }
+
+ @Override
public Class<Double> targetType() {
return Double.class;
}
@@ -142,6 +175,11 @@
}
@Override
+ public String parse(JsonNode node) {
+ return node.isNull() ? null : node.asText();
+ }
+
+ @Override
public Class<String> targetType() {
return String.class;
}
@@ -159,6 +197,11 @@
}
@Override
+ public Long parse(JsonNode node) {
+ return node.isNull() ? null : node.asLong();
+ }
+
+ @Override
public Class<Long> targetType() {
return Long.class;
}
@@ -176,6 +219,11 @@
}
@Override
+ public Boolean parse(JsonNode node) {
+ return node.isNull() ? null : node.asBoolean();
+ }
+
+ @Override
public Class<Boolean> targetType() {
return Boolean.class;
}
@@ -200,6 +248,11 @@
}
@Override
+ public Level parse(JsonNode node) {
+ return node.isNull() ? null : parse(node.asText());
+ }
+
+ @Override
public Class<Level> targetType() {
return Level.class;
}
@@ -227,6 +280,20 @@
}
@Override
+ public String[] parse(JsonNode node) {
+ if (node.isNull()) {
+ return null;
+ }
+ List<String> strings = new ArrayList<>();
+ if (node instanceof ArrayNode) {
+ node.elements().forEachRemaining(n -> strings.add(n.asText()));
+ return strings.toArray(new String[strings.size()]);
+ } else {
+ return parse(node.asText());
+ }
+ }
+
+ @Override
public Class<String[]> targetType() {
return String[].class;
}
@@ -253,6 +320,11 @@
}
@Override
+ public java.net.URL parse(JsonNode node) {
+ return node.isNull() ? null : parse(node.asText());
+ }
+
+ @Override
public Class<java.net.URL> targetType() {
return java.net.URL.class;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 519bafc..75c0827 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -50,7 +50,6 @@
NCSERVICE_PORT(INTEGER, 9090),
CLUSTER_ADDRESS(STRING, (String) null),
CLUSTER_PORT(INTEGER, 1099),
- CLUSTER_CONTROLLER_ID(SHORT, (short) 0x0000),
CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
NODE_ID(STRING, (String) null),
@@ -144,8 +143,6 @@
return "Cluster Controller port";
case CLUSTER_LISTEN_PORT:
return "IP port to bind cluster listener";
- case CLUSTER_CONTROLLER_ID:
- return "16-bit (0-65535) id of the Cluster Controller";
case CLUSTER_PUBLIC_ADDRESS:
return "Public IP Address to announce cluster listener";
case CLUSTER_PUBLIC_PORT:
@@ -313,10 +310,6 @@
configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
}
- public CcId getClusterControllerId() {
- return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID));
- }
-
public String getClusterListenAddress() {
return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
index bf233a8..e78a423 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
@@ -33,6 +33,8 @@
private int profileDumpPeriod;
+ private int registrationId;
+
public ClusterControllerInfo getClusterControllerInfo() {
return ccInfo;
}
@@ -64,4 +66,12 @@
public void setProfileDumpPeriod(int profileDumpPeriod) {
this.profileDumpPeriod = profileDumpPeriod;
}
+
+ public int getRegistrationId() {
+ return registrationId;
+ }
+
+ public void setRegistrationId(int registrationId) {
+ this.registrationId = registrationId;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 75ef0b7..a87c30a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -72,13 +73,15 @@
private final NodeCapacity capacity;
- private final long maxJobId;
+ private final int registrationId;
+
+ private static final AtomicInteger nextRegistrationId = new AtomicInteger();
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema,
- NetworkAddress messagingPort, NodeCapacity capacity, int pid, long maxJobId) {
+ NetworkAddress messagingPort, NodeCapacity capacity, int pid) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
@@ -100,7 +103,7 @@
this.messagingPort = messagingPort;
this.capacity = capacity;
this.pid = pid;
- this.maxJobId = maxJobId;
+ this.registrationId = nextRegistrationId.getAndIncrement();
}
public InetSocketAddress getNodeControllerAddress() {
@@ -187,7 +190,7 @@
return pid;
}
- public long getMaxJobId() {
- return maxJobId;
+ public int getRegistrationId() {
+ return registrationId;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0fdafe3..ae40ea3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -57,11 +57,9 @@
public class ClusterControllerRemoteProxy implements IClusterController {
- private final CcId ccId;
private IIPCHandle ipcHandle;
- public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
- this.ccId = ccId;
+ public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
this.ipcHandle = ipcHandle;
}
@@ -178,12 +176,7 @@
}
@Override
- public CcId getCcId() {
- return ccId;
- }
-
- @Override
public String toString() {
- return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]";
+ return getClass().getSimpleName() + " [" + ipcHandle.getRemoteAddress() + "]";
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
new file mode 100644
index 0000000..63fffb4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.common.base.IClusterController;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
+import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CcConnection {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final IClusterController ccs;
+ private boolean registrationPending;
+ private Exception registrationException;
+ private NodeParameters nodeParameters;
+
+ CcConnection(IClusterController ccs) {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public String toString() {
+ return ccs.toString();
+ }
+
+ public CcId getCcId() {
+ return getNodeParameters().getClusterControllerInfo().getCcId();
+ }
+
+ synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ nodeParameters = parameters;
+ registrationException = exception;
+ registrationPending = false;
+ notifyAll();
+ }
+
+ public synchronized CcId registerNode(NodeRegistration nodeRegistration) throws Exception {
+ registrationPending = true;
+ ccs.registerNode(nodeRegistration);
+ while (registrationPending) {
+ wait();
+ }
+ if (registrationException != null) {
+ LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException);
+ throw registrationException;
+ }
+ return getCcId();
+ }
+
+ public IClusterController getClusterControllerService() {
+ return ccs;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return nodeParameters;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index ec8cf27..a03e0ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -53,7 +53,7 @@
LOGGER.log(Level.DEBUG, "Exception parsing command line: " + Arrays.toString(args), e);
System.exit(2);
} catch (Exception e) {
- LOGGER.log(Level.DEBUG, "Exiting NCDriver due to exception", e);
+ LOGGER.error("Exiting NCDriver due to exception", e);
System.exit(1);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0e74a4c..b1909dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
@@ -84,6 +85,7 @@
import org.apache.hyracks.control.nc.net.NetworkManager;
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
+import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
import org.apache.hyracks.ipc.api.IIPCEventListener;
import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -108,7 +110,7 @@
private static final double MEMORY_FUDGE_FACTOR = 0.8;
private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
- private NCConfig ncConfig;
+ private final NCConfig ncConfig;
private final String id;
@@ -128,13 +130,15 @@
private final Timer timer;
- private boolean registrationPending;
+ private CcId primaryCcId;
- private Exception registrationException;
+ private final Object ccLock = new Object();
- private IClusterController primaryCcs;
+ private final Map<CcId, CcConnection> ccMap = Collections.synchronizedMap(new HashMap<>());
- private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>());
+ private final Map<InetSocketAddress, CcId> ccAddressMap = Collections.synchronizedMap(new HashMap<>());
+
+ private final Map<Integer, CcConnection> pendingRegistrations = Collections.synchronizedMap(new HashMap<>());
private final Map<JobId, Joblet> jobletMap;
@@ -144,11 +148,9 @@
private ExecutorService executor;
- private NodeParameters nodeParameters;
+ private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
- private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>();
-
- private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>();
+ private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
private final ServerContext serverCtx;
@@ -180,9 +182,7 @@
private final ConfigManager configManager;
- private NodeRegistration nodeRegistration;
-
- private final AtomicLong maxJobId = new AtomicLong(-1);
+ private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
static {
ExitUtil.init();
@@ -254,7 +254,7 @@
}
getNodeControllerInfosAcceptor.setValue(fv);
}
- primaryCcs.getNodeControllerInfos();
+ getPrimaryClusterController().getNodeControllerInfos();
return fv.get();
}
@@ -302,9 +302,7 @@
messagingNetManager.start();
}
- final InetSocketAddress ccAddress =
- new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort());
- this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
+ this.primaryCcId = addCc(new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
workQueue.start();
@@ -315,64 +313,81 @@
application.startupCompleted();
}
- public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception {
- ClusterControllerRemoteProxy ccProxy;
- synchronized (ccsMap) {
- if (ccsMap.containsKey(ccId)) {
- throw new IllegalStateException("cc already registered: " + ccId);
+ public CcId addCc(InetSocketAddress ccAddress) throws Exception {
+ synchronized (ccLock) {
+ LOGGER.info("addCc: {}", ccAddress);
+ if (ccAddress.isUnresolved()) {
+ throw new IllegalArgumentException("must use resolved InetSocketAddress");
+ }
+ if (ccAddressMap.containsKey(ccAddress)) {
+ throw new IllegalStateException("cc already registered: " + ccAddress);
}
final IIPCEventListener ipcEventListener = new IIPCEventListener() {
@Override
public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
// we need to re-register in case of NC -> CC connection reset
try {
- registerNode(ccsMap.get(ccId));
+ registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Failed Registering with cc", e);
throw new IPCException(e);
}
}
};
- ccProxy = new ClusterControllerRemoteProxy(ccId,
+ ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy(
ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
- registerNode(ccProxy);
- ccsMap.put(ccId, ccProxy);
- }
- return ccProxy;
- }
-
- public void makePrimaryCc(CcId ccId) throws Exception {
- synchronized (ccsMap) {
- if (!ccsMap.containsKey(ccId)) {
- throw new IllegalArgumentException("unknown cc: " + ccId);
- }
- primaryCcs = ccsMap.get(ccId);
+ CcConnection ccc = new CcConnection(ccProxy);
+ return registerNode(ccc, ccAddress);
}
}
- public void removeCc(CcId ccId) throws Exception {
- synchronized (ccsMap) {
- final IClusterController ccs = ccsMap.get(ccId);
- if (ccs == null) {
- throw new IllegalArgumentException("unknown cc: " + ccId);
+ public void makePrimaryCc(InetSocketAddress ccAddress) throws Exception {
+ LOGGER.info("makePrimaryCc: {}", ccAddress);
+ if (ccAddress.isUnresolved()) {
+ throw new IllegalArgumentException("must use resolved InetSocketAddress");
+ }
+ CcId newPrimaryCc = ccAddressMap.get(ccAddress);
+ if (newPrimaryCc == null) {
+ throw new IllegalArgumentException("unknown cc: " + ccAddress);
+ }
+ this.primaryCcId = newPrimaryCc;
+ }
+
+ public void removeCc(InetSocketAddress ccAddress) throws Exception {
+ synchronized (ccLock) {
+ LOGGER.info("removeCc: {}", ccAddress);
+ if (ccAddress.isUnresolved()) {
+ throw new IllegalArgumentException("must use resolved InetSocketAddress");
}
- if (primaryCcs.equals(ccs)) {
- throw new IllegalStateException("cannot remove primary cc: " + ccId);
+ CcId ccId = ccAddressMap.get(ccAddress);
+ if (ccId == null) {
+ LOGGER.warn("ignoring request to remove unknown cc: {}", ccAddress);
+ return;
}
- // TODO(mblow): consider how to handle running jobs
- ccs.unregisterNode(id);
- Thread hbThread = heartbeatThreads.remove(ccs);
+ if (primaryCcId.equals(ccId)) {
+ throw new IllegalStateException("cannot remove primary cc: " + ccAddress);
+ }
+ try {
+ final CcConnection ccc = getCcConnection(ccId);
+ ccc.getClusterControllerService().unregisterNode(id);
+ } catch (Exception e) {
+ LOGGER.warn("ignoring exception trying to gracefully unregister cc {}: ", () -> ccId,
+ () -> String.valueOf(e));
+ }
+ getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
+ Thread hbThread = heartbeatThreads.remove(ccId);
hbThread.interrupt();
- Timer ccTimer = ccTimers.remove(ccs);
+ Timer ccTimer = ccTimers.remove(ccId);
if (ccTimer != null) {
ccTimer.cancel();
}
+ ccMap.remove(ccId);
+ ccAddressMap.remove(ccAddress);
}
}
- protected void registerNode(IClusterController ccs) throws Exception {
- LOGGER.info("Registering with Cluster Controller {}", ccs);
- registrationPending = true;
+ protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+ LOGGER.info("Registering with Cluster Controller {}", ccc);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
@@ -389,54 +404,70 @@
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
NetworkAddress messagingAddress =
messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
- int allCores = osMXBean.getAvailableProcessors();
- nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, osMXBean.getName(),
- osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(),
- runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
- runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(),
- PidHelper.getPid(), maxJobId.get());
+ NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress,
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+ runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress,
+ application.getCapacity(), PidHelper.getPid());
- ccs.registerNode(nodeRegistration);
-
- completeNodeRegistration(ccs);
+ pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc);
+ CcId ccId = ccc.registerNode(nodeRegistration);
+ ccMap.put(ccId, ccc);
+ ccAddressMap.put(ccAddress, ccId);
+ Serializable distributedState = ccc.getNodeParameters().getDistributedState();
+ if (distributedState != null) {
+ getDistributedState().put(ccId, distributedState);
+ }
+ application.onRegisterNode(ccId);
+ IClusterController ccs = ccc.getClusterControllerService();
+ NodeParameters nodeParameters = ccc.getNodeParameters();
// Start heartbeat generator.
- if (!heartbeatThreads.containsKey(ccs)) {
+ if (!heartbeatThreads.containsKey(ccId)) {
Thread heartbeatThread =
new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
heartbeatThread.setPriority(Thread.MAX_PRIORITY);
heartbeatThread.setDaemon(true);
heartbeatThread.start();
- heartbeatThreads.put(ccs, heartbeatThread);
+ heartbeatThreads.put(ccId, heartbeatThread);
}
- if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) {
- Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+ if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) {
+ Timer ccTimer = new Timer("Timer-" + ccId, true);
// Schedule profile dump generator.
- ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
- ccTimers.put(ccs, ccTimer);
+ ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
+ ccTimers.put(ccId, ccTimer);
}
- LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+ LOGGER.info("Registering with Cluster Controller {} complete", ccc);
+ return ccId;
}
- synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
- this.nodeParameters = parameters;
- this.registrationException = exception;
- this.registrationPending = false;
- notifyAll();
+ void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ CcConnection ccc = getPendingNodeRegistration(parameters);
+ ccc.setNodeRegistrationResult(parameters, exception);
}
- private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception {
- while (registrationPending) {
- wait();
+ private CcConnection getCcConnection(CcId ccId) {
+ CcConnection ccConnection = ccMap.get(ccId);
+ if (ccConnection == null) {
+ throw new IllegalArgumentException("unknown ccId: " + ccId);
}
- if (registrationException != null) {
- LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException);
- throw registrationException;
+ return ccConnection;
+ }
+
+ private CcConnection getPendingNodeRegistration(NodeParameters nodeParameters) {
+ CcConnection ccConnection = pendingRegistrations.remove(nodeParameters.getRegistrationId());
+ if (ccConnection == null) {
+ throw new IllegalStateException("Unknown pending node registration " + nodeParameters.getRegistrationId()
+ + " for " + nodeParameters.getClusterControllerInfo().getCcId());
}
- serviceCtx.setDistributedState(nodeParameters.getDistributedState());
- application.onRegisterNode(ccs.getCcId());
+ return ccConnection;
+ }
+
+ private ConcurrentHashMap<CcId, Serializable> getDistributedState() {
+ //noinspection unchecked
+ return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState();
}
private void startApplication() throws Exception {
@@ -448,7 +479,12 @@
}
public void updateMaxJobId(JobId jobId) {
- maxJobId.getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId()));
+ maxJobIds.computeIfAbsent(jobId.getCcId(), key -> new AtomicLong())
+ .getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId()));
+ }
+
+ public long getMaxJobId(CcId ccId) {
+ return maxJobIds.computeIfAbsent(ccId, key -> new AtomicLong(ccId.toLongMask())).get();
}
@Override
@@ -478,10 +514,10 @@
t.interrupt();
InvokeUtil.doUninterruptibly(() -> t.join(1000));
});
- synchronized (ccsMap) {
- ccsMap.values().parallelStream().forEach(ccs -> {
+ synchronized (ccLock) {
+ ccMap.values().parallelStream().forEach(cc -> {
try {
- ccs.notifyShutdown(id);
+ cc.getClusterControllerService().notifyShutdown(id);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
}
@@ -520,13 +556,8 @@
jobParameterByteStoreMap.remove(jobId);
}
- public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException {
- JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
- if (jpbs == null) {
- jpbs = new JobParameterByteStore();
- jobParameterByteStoreMap.put(jobId, jpbs);
- }
- return jpbs;
+ public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) {
+ return jobParameterByteStoreMap.computeIfAbsent(jobId, jid -> new JobParameterByteStore());
}
public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg)
@@ -550,7 +581,7 @@
}
}
- public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+ public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) {
return deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId());
}
@@ -566,16 +597,21 @@
return partitionManager;
}
+ public CcId getPrimaryCcId() {
+ // TODO(mblow): this can change at any time, need notification framework
+ return primaryCcId;
+ }
+
public IClusterController getPrimaryClusterController() {
- return primaryCcs;
+ return getClusterController(primaryCcId);
}
public IClusterController getClusterController(CcId ccId) {
- return ccsMap.get(ccId);
+ return getCcConnection(ccId).getClusterControllerService();
}
- public NodeParameters getNodeParameters() {
- return nodeParameters;
+ public NodeParameters getNodeParameters(CcId ccId) {
+ return getCcConnection(ccId).getNodeParameters();
}
@Override
@@ -691,17 +727,19 @@
}
private class ProfileDumpTask extends TimerTask {
- private IClusterController cc;
+ private final IClusterController cc;
+ private final CcId ccId;
- public ProfileDumpTask(IClusterController cc) {
+ public ProfileDumpTask(IClusterController cc, CcId ccId) {
this.cc = cc;
+ this.ccId = ccId;
}
@Override
public void run() {
try {
FutureValue<List<JobProfile>> fv = new FutureValue<>();
- BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv);
+ BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, ccId, fv);
workQueue.scheduleAndSync(bjpw);
List<JobProfile> profiles = fv.get();
if (!profiles.isEmpty()) {
@@ -734,7 +772,7 @@
}
public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
- ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
+ getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
}
public IDatasetPartitionManager getDatasetPartitionManager() {
@@ -759,4 +797,5 @@
public Object getApplicationContext() {
return application.getApplicationContext();
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index 6a75471..87330226 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
@@ -50,7 +51,7 @@
public NCServiceContext(NodeControllerService ncs, ServerContext serverCtx, IOManager ioManager, String nodeId,
MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
- IApplicationConfig appConfig) throws IOException {
+ IApplicationConfig appConfig) {
super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
this.lccm = lifeCyclecomponentManager;
this.nodeId = nodeId;
@@ -59,6 +60,7 @@
this.ncs = ncs;
this.sdh = lccm::dumpState;
this.tracer = new Tracer(nodeId, ncs.getConfiguration().getTraceCategories(), new TraceCategoryRegistry());
+ this.distributedState = new ConcurrentHashMap<>();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 54a171d..2742aaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.util.file.FileUtil;
public class IOManager implements IIOManager {
/*
@@ -72,7 +73,11 @@
workspaces = new ArrayList<>();
for (IODeviceHandle d : ioDevices) {
if (d.getWorkspace() != null) {
- new File(d.getMount(), d.getWorkspace()).mkdirs();
+ try {
+ FileUtil.forceMkdirs(new File(d.getMount(), d.getWorkspace()));
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
workspaces.add(d);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
index c521f08..8de30ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
@@ -79,6 +79,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
index d6e175e..1b0093d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
@@ -19,10 +19,18 @@
package org.apache.hyracks.util.file;
import java.io.File;
+import java.io.IOException;
import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
public class FileUtil {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final Object LOCK = new Object();
+
private FileUtil() {
}
@@ -30,6 +38,19 @@
return joinPath(File.separatorChar, elements);
}
+ public static void forceMkdirs(File dir) throws IOException {
+ File canonicalDir = dir.getCanonicalFile();
+ try {
+ FileUtils.forceMkdir(canonicalDir);
+ } catch (IOException e) {
+ LOGGER.warn("failure to create directory {}, retrying", dir, e);
+ synchronized (LOCK) {
+ FileUtils.forceMkdir(canonicalDir);
+ }
+ }
+
+ }
+
static String joinPath(char separatorChar, String... elements) {
final String separator = String.valueOf(separatorChar);
final String escapedSeparator = Pattern.quote(separator);
diff --git a/hyracks-fullstack/src/test/resources/log4j2-test.xml b/hyracks-fullstack/src/test/resources/log4j2-test.xml
index d56f215..a8141ee 100644
--- a/hyracks-fullstack/src/test/resources/log4j2-test.xml
+++ b/hyracks-fullstack/src/test/resources/log4j2-test.xml
@@ -32,12 +32,10 @@
<Root level="WARN">
<AppenderRef ref="InfoLog"/>
</Root>
- <Logger name="org.apache.hyracks" level="INFO" additivity="false">
- <AppenderRef ref="InfoLog"/>
- </Logger>
- <Logger name="org.apache.hyracks.test" level="INFO" additivity="false">
+ <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/>
+ <Logger name="org.apache.hyracks" level="INFO"/>
+ <Logger name="org.apache.hyracks.test" level="INFO">
<AppenderRef ref="ConsoleTest"/>
- <AppenderRef ref="InfoLog"/>
</Logger>
</Loggers>
</Configuration>