Asterix MessageBroker implementation
This change includes the following:
- Add implementation for CC/NC MessageBroker.
- Implement GlobalResourceIdFactory using MessageBroker.
- Change resource id factory to GlobalResourceIdFactory.
- Refactor metadata indexes fixed properties.
- Use fixed resource ids for metadata indexes.
Change-Id: If4320e2c5a0130d2f86a4be6ae61f5cee43e30af
Reviewed-on: https://asterix-gerrit.ics.uci.edu/486
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index ce19139..45c0598 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -49,12 +49,13 @@
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.feeds.FeedManager;
-import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.replication.management.ReplicationChannel;
import org.apache.asterix.replication.management.ReplicationManager;
import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.GlobalResourceIdFactoryProvider;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -81,8 +82,7 @@
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
-import org.apache.hyracks.storage.common.file.ResourceIdFactoryProvider;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
@@ -118,7 +118,7 @@
private ILSMIOOperationScheduler lsmIOScheduler;
private ILocalResourceRepository localResourceRepository;
- private ResourceIdFactory resourceIdFactory;
+ private IResourceIdFactory resourceIdFactory;
private IIOManager ioManager;
private boolean isShuttingdown;
@@ -158,13 +158,14 @@
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
+ ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
+ ioManager, ncApplicationContext.getNodeId());
+ localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
+
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
txnProperties);
- ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
- ioManager, ncApplicationContext.getNodeId());
- localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
SystemState systemState = recoveryMgr.getSystemState();
@@ -175,7 +176,7 @@
initializeResourceIdFactory();
datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
- MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
isShuttingdown = false;
@@ -278,7 +279,7 @@
return localResourceRepository;
}
- public ResourceIdFactory getResourceIdFactory() {
+ public IResourceIdFactory getResourceIdFactory() {
return resourceIdFactory;
}
@@ -376,6 +377,7 @@
@Override
public void initializeResourceIdFactory() throws HyracksDataException {
- resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
+ resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext)
+ .createResourceIdFactory();
}
-}
\ No newline at end of file
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
index 570c3c9..b975970 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
@@ -32,7 +32,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public class AsterixAppRuntimeContextProdiverForRecovery implements IAsterixAppRuntimeContextProvider {
@@ -78,7 +78,7 @@
}
@Override
- public ResourceIdFactory getResourceIdFactory() {
+ public IResourceIdFactory getResourceIdFactory() {
return asterixAppRuntimeContext.getResourceIdFactory();
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 80ce5ea..d2164f4 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -43,6 +43,7 @@
import org.apache.asterix.event.service.ILookupService;
import org.apache.asterix.feeds.CentralFeedManager;
import org.apache.asterix.feeds.FeedLifecycleListener;
+import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
@@ -55,6 +56,8 @@
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -72,9 +75,11 @@
private static IAsterixStateProxy proxy;
private ICCApplicationContext appCtx;
+ private IMessageBroker messageBroker;
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+ messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService());
this.appCtx = ccAppCtx;
if (LOGGER.isLoggable(Level.INFO)) {
@@ -118,6 +123,7 @@
}
ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
+ ccAppCtx.setMessageBroker(messageBroker);
}
@Override
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 2cd6a1a..147a356 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -34,12 +34,13 @@
import org.apache.asterix.common.config.AsterixReplicationProperties;
import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.event.schema.cluster.SubstituteNodes;
+import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -54,6 +55,8 @@
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -75,6 +78,7 @@
private SystemState systemState = SystemState.NEW_UNIVERSE;
private boolean performedRemoteRecovery = false;
private boolean replicationEnabled = false;
+ private IMessageBroker messageBroker;
@Override
public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
@@ -91,6 +95,8 @@
ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
ncApplicationContext = ncAppCtx;
+ messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService());
+ ncApplicationContext.setMessageBroker(messageBroker);
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -191,6 +197,9 @@
@Override
public void notifyStartupComplete() throws Exception {
+ //send max resource id on this NC to the CC
+ ((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
+
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
@@ -250,8 +259,8 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Configured:" + lccm);
}
- ncApplicationContext.setStateDumpHandler(new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm
- .getDumpPath(), lccm));
+ ncApplicationContext.setStateDumpHandler(
+ new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm.getDumpPath(), lccm));
lccm.startAll();
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
new file mode 100644
index 0000000..095ef1b
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
+import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
+import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+
+public class CCMessageBroker implements IMessageBroker {
+
+ private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
+ private final AtomicLong globalResourceId = new AtomicLong(0);
+ private final ClusterControllerService ccs;
+ private final Set<String> nodesReportedMaxResourceId = new HashSet<>();
+ public static final long NO_CALLBACK_MESSAGE_ID = -1;
+
+ public CCMessageBroker(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public void receivedMessage(IMessage message, String nodeId) throws Exception {
+ AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+ switch (absMessage.getMessageType()) {
+ case RESOURCE_ID_REQUEST:
+ handleResourceIdRequest(message, nodeId);
+ break;
+ case REPORT_MAX_RESOURCE_ID_RESPONSE:
+ handleReportResourceMaxIdResponse(message, nodeId);
+ break;
+ default:
+ LOGGER.warning("Unknown message: " + absMessage.getMessageType());
+ break;
+ }
+ }
+
+ private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
+ ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message;
+ ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
+ reponse.setId(msg.getId());
+ //cluster is not active
+ if (!AsterixClusterProperties.isClusterActive()) {
+ reponse.setResourceId(-1);
+ reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
+ } else if (nodesReportedMaxResourceId.size() < AsterixClusterProperties.getNumberOfNodes()) {
+ //some node has not reported max resource id
+ reponse.setResourceId(-1);
+ reponse.setException(new Exception("One or more nodes has not reported max resource id."));
+ requestMaxResourceID();
+ } else {
+ reponse.setResourceId(globalResourceId.incrementAndGet());
+ }
+ sendApplicationMessageToNC(reponse, nodeId);
+ }
+
+ private synchronized void handleReportResourceMaxIdResponse(IMessage message, String nodeId) throws Exception {
+ ReportMaxResourceIdMessage msg = (ReportMaxResourceIdMessage) message;
+ globalResourceId.set(Math.max(msg.getMaxResourceId(), globalResourceId.get()));
+ nodesReportedMaxResourceId.add(nodeId);
+ }
+
+ private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception {
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ NodeControllerState state = nodeMap.get(nodeId);
+ state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+ }
+
+ private void requestMaxResourceID() throws Exception {
+ //send request to NCs that have not reported their max resource ids
+ Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+ ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+ msg.setId(NO_CALLBACK_MESSAGE_ID);
+ for (String nodeId : getParticipantNodes) {
+ if (!nodesReportedMaxResourceId.contains(nodeId)) {
+ sendApplicationMessageToNC(msg, nodeId);
+ }
+ }
+ }
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
new file mode 100644
index 0000000..001771e
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class NCMessageBroker implements INCMessageBroker {
+ private final NodeControllerService ncs;
+ private final AtomicLong messageId = new AtomicLong(0);
+ private final Map<Long, IApplicationMessageCallback> callbacks;
+
+ public NCMessageBroker(NodeControllerService ncs) {
+ this.ncs = ncs;
+ callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+ }
+
+ @Override
+ public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
+ if (callback != null) {
+ long uniqueMessageId = messageId.incrementAndGet();
+ message.setId(uniqueMessageId);
+ callbacks.put(uniqueMessageId, callback);
+ }
+ try {
+ ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
+ } catch (Exception e) {
+ if (callback != null) {
+ //remove the callback in case of failure
+ callbacks.remove(message.getId());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void receivedMessage(IMessage message, String nodeId) throws Exception {
+ AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+ //if the received message is a response to a sent message, deliver it to the sender
+ IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
+ if (callback != null) {
+ callback.deliverMessageResponse(absMessage);
+ }
+
+ //handle requests from CC
+ switch (absMessage.getMessageType()) {
+ case REPORT_MAX_RESOURCE_ID_REQUEST:
+ reportMaxResourceId();
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void reportMaxResourceId() throws Exception {
+ IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+ .getApplicationObject();
+ ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
+ //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
+ long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+ maxResourceIdMsg.setMaxResourceId(maxResourceId);
+ sendMessage(maxResourceIdMsg, null);
+ }
+}
diff --git a/asterix-common/pom.xml b/asterix-common/pom.xml
index 512502b..02f651f 100644
--- a/asterix-common/pom.xml
+++ b/asterix-common/pom.xml
@@ -215,7 +215,15 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.2.0</version>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 94f5b2f..cd829e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -39,6 +39,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
public interface IAsterixAppRuntimeContext {
@@ -65,7 +66,7 @@
public IDatasetLifecycleManager getDatasetLifecycleManager();
- public ResourceIdFactory getResourceIdFactory();
+ public IResourceIdFactory getResourceIdFactory();
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
new file mode 100644
index 0000000..fbb9b86
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+
+public abstract class AbstractApplicationMessage implements IApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ private long id;
+
+ @Override
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public long getId() {
+ return id;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
new file mode 100644
index 0000000..a2b94a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ public long maxResourceId;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_RESPONSE;
+ }
+
+ public long getMaxResourceId() {
+ return maxResourceId;
+ }
+
+ public void setMaxResourceId(long maxResourceId) {
+ this.maxResourceId = maxResourceId;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
new file mode 100644
index 0000000..d2837ce
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ public long maxResourceId;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_REQUEST;
+ }
+
+ public long getMaxResourceId() {
+ return maxResourceId;
+ }
+
+ public void setMaxResourceId(long maxResourceId) {
+ this.maxResourceId = maxResourceId;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
new file mode 100644
index 0000000..daeb9c4
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.RESOURCE_ID_REQUEST;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
new file mode 100644
index 0000000..09c50d3
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+
+ private long resourceId;
+ private Exception exception;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.RESOURCE_ID_RESPONSE;
+ }
+
+ public long getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(long resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public void setException(Exception exception) {
+ this.exception = exception;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
new file mode 100644
index 0000000..61ab7cd
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessage;
+
+public interface IApplicationMessage extends IMessage {
+
+ public enum ApplicationMessageType {
+ RESOURCE_ID_REQUEST,
+ RESOURCE_ID_RESPONSE,
+ REPORT_MAX_RESOURCE_ID_REQUEST,
+ REPORT_MAX_RESOURCE_ID_RESPONSE
+ }
+
+ public abstract ApplicationMessageType getMessageType();
+
+ /**
+ * Sets a unique message id that identifies this message within an NC.
+ * This id is set by {@link INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+ * when the callback is not null to notify the sender when the response to that message is received.
+ *
+ * @param messageId
+ */
+ public void setId(long messageId);
+
+ /**
+ * @return The unique message id if it has been set, otherwise 0.
+ */
+ public long getId();
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
new file mode 100644
index 0000000..3bad5fb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+public interface IApplicationMessageCallback {
+
+ /**
+ * Notifies the message sender when the response has been received.
+ *
+ * @param message
+ * The response message
+ */
+ public void deliverMessageResponse(IApplicationMessage message);
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
new file mode 100644
index 0000000..3ff83b6
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface INCMessageBroker extends IMessageBroker {
+
+ /**
+ * Sends application message from this NC to the CC.
+ *
+ * @param message
+ * @param callback
+ * @throws Exception
+ */
+ public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+
+ /**
+ * Sends the maximum resource id on this NC to the CC.
+ *
+ * @throws Exception
+ */
+ public void reportMaxResourceId() throws Exception;
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index d308564..6382af9 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -30,7 +30,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public interface IAsterixAppRuntimeContextProvider {
@@ -52,7 +52,7 @@
public ILocalResourceRepository getLocalResourceRepository();
- public ResourceIdFactory getResourceIdFactory();
+ public IResourceIdFactory getResourceIdFactory();
public IIOManager getIOManager();
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 47203cf..c402bef 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -41,6 +41,7 @@
import org.apache.asterix.metadata.api.IMetadataIndex;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.api.IValueExtractor;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.bootstrap.MetadataSecondaryIndexes;
import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -108,7 +109,7 @@
public class MetadataNode implements IMetadataNode {
private static final long serialVersionUID = 1L;
- private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID);
+ private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataIndexImmutableProperties.METADATA.getDatasetId());
private IDatasetLifecycleManager datasetLifecycleManager;
private ITransactionSubsystem transactionSubsystem;
@@ -1087,7 +1088,7 @@
@Override
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException {
- int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
+ int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
try {
String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c068657..d76af86 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -20,7 +20,6 @@
package org.apache.asterix.metadata.bootstrap;
import java.io.File;
-import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -39,7 +38,6 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataException;
@@ -73,7 +71,6 @@
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
@@ -223,7 +220,7 @@
}
}
- public static void stopUniverse() throws HyracksDataException {
+ public static void stopUniverse() {
// Close all BTree files in BufferCache.
// metadata datasets will be closed when the dataset life cycle manger is closed
}
@@ -404,7 +401,7 @@
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
null, null, null, null, true);
lsmBtree.create();
- resourceID = runtimeContext.getResourceIdFactory().createId();
+ resourceID = index.getResourceID();
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
comparatorFactories, bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(),
runtimeContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES,
@@ -443,7 +440,7 @@
return metadataNodeName;
}
- public static void startDDLRecovery() throws RemoteException, ACIDException, MetadataException {
+ public static void startDDLRecovery() throws MetadataException {
//#. clean up any record which has pendingAdd/DelOp flag
// as traversing all records from DATAVERSE_DATASET to DATASET_DATASET, and then to INDEX_DATASET.
String dataverseName = null;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index 3a76db7..9ef9f84 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -79,9 +79,9 @@
// PrimaryKeyField indexes used for secondary index operations
protected final int[] primaryKeyIndexes;
- public MetadataIndex(String datasetName, String indexName, int numFields, IAType[] keyTypes,
- List<List<String>> keyNames, int numSecondaryIndexKeys, ARecordType payloadType, int datasetId,
- boolean isPrimaryIndex, int[] primaryKeyIndexes) throws AsterixRuntimeException {
+ public MetadataIndex(MetadataIndexImmutableProperties indexImmutableProperties, int numFields, IAType[] keyTypes,
+ List<List<String>> keyNames, int numSecondaryIndexKeys, ARecordType payloadType, boolean isPrimaryIndex,
+ int[] primaryKeyIndexes) throws AsterixRuntimeException {
// Sanity checks.
if (keyTypes.length != keyNames.size()) {
throw new AsterixRuntimeException("Unequal number of key types and names given.");
@@ -90,12 +90,8 @@
throw new AsterixRuntimeException("Number of keys given is greater than total number of fields.");
}
// Set simple fields.
- this.datasetName = datasetName;
- if (indexName == null) {
- this.indexName = datasetName;
- } else {
- this.indexName = indexName;
- }
+ this.datasetName = indexImmutableProperties.getDatasetName();
+ this.indexName = indexImmutableProperties.getIndexName();
this.keyTypes = keyTypes;
this.keyNames = keyNames;
this.payloadType = payloadType;
@@ -147,11 +143,12 @@
}
}
- this.datasetId = new DatasetId(datasetId);
+ this.datasetId = new DatasetId(indexImmutableProperties.getDatasetId());
this.isPrimaryIndex = isPrimaryIndex;
//PrimaryKeyFieldIndexes
this.primaryKeyIndexes = primaryKeyIndexes;
+ this.resourceId = indexImmutableProperties.getResourceId();
}
@Override
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
new file mode 100644
index 0000000..9b4d0d1
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.bootstrap;
+
+public enum MetadataIndexImmutableProperties {
+ METADATA(MetadataConstants.METADATA_DATAVERSE_NAME, 0, 0),
+ DATAVERSE("Dataverse", 1, 1),
+ DATASET("Dataset", 2, 2),
+ DATATYPE("Datatype", 3, 3),
+ INDEX("Index", 4, 4),
+ NODE("Node", 5, 5),
+ NODEGROUP("Nodegroup", 6, 6),
+ FUNCTION("Function", 7, 7),
+ DATASOURCE_ADAPTER("DatasourceAdapter", 8, 8),
+ LIBRARY("Library", 9, 9),
+ FEED("Feed", 10, 10),
+ FEED_ACTIVITY_DATASET_ID("FeedActivity", 11, 11),
+ FEED_POLICY("FeedPolicy", 12, 12),
+ COMPACTION_POLICY("CompactionPolicy", 13, 13),
+ EXTERNAL_FILE("ExternalFile", 14, 14),
+ GROUPNAME_ON_DATASET("GroupName", DATASET, 15),
+ DATATYPE_NAME_ON_DATASET("DatatypeName", DATASET, 16),
+ DATATYPE_NAME_ON_DATATYPE("DatatypeName", DATATYPE, 17);
+
+ private final String indexName;
+ private final int datasetId;
+ private final long resourceId;
+ private final MetadataIndexImmutableProperties dataset;
+
+ public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
+
+ private MetadataIndexImmutableProperties(String indexName, int datasetId, long resourceId) {
+ this.indexName = indexName;
+ this.datasetId = datasetId;
+ this.resourceId = resourceId;
+ //a primary index's dataset is itself
+ this.dataset = this;
+ }
+
+ private MetadataIndexImmutableProperties(String indexName, MetadataIndexImmutableProperties dataset,
+ long resourceId) {
+ this.indexName = indexName;
+ this.datasetId = dataset.datasetId;
+ this.resourceId = resourceId;
+ this.dataset = dataset;
+ }
+
+ public long getResourceId() {
+ return resourceId;
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public String getDatasetName() {
+ return dataset.indexName;
+ }
+
+ public int getDatasetId() {
+ return dataset.datasetId;
+ }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 9bcad8f..258e1c4 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -46,29 +46,9 @@
public static IMetadataIndex COMPACTION_POLICY_DATASET;
public static IMetadataIndex EXTERNAL_FILE_DATASET;
- public static final int METADATA_DATASET_ID = 0;
- public static final int DATAVERSE_DATASET_ID = 1;
- public static final int DATASET_DATASET_ID = 2;
- public static final int DATATYPE_DATASET_ID = 3;
- public static final int INDEX_DATASET_ID = 4;
- public static final int NODE_DATASET_ID = 5;
- public static final int NODEGROUP_DATASET_ID = 6;
- public static final int FUNCTION_DATASET_ID = 7;
- public static final int DATASOURCE_ADAPTER_DATASET_ID = 8;
-
- public static final int LIBRARY_DATASET_ID = 9;
- public static final int FEED_DATASET_ID = 10;
- public static final int FEED_ACTIVITY_DATASET_ID = 11;
- public static final int FEED_POLICY_DATASET_ID = 12;
- public static final int COMPACTION_POLICY_DATASET_ID = 13;
- public static final int EXTERNAL_FILE_DATASET_ID = 14;
-
- public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
-
/**
* Create all metadata primary index descriptors. MetadataRecordTypes must
* have been initialized before calling this init.
- *
* @throws MetadataException
* If MetadataRecordTypes have not been initialized.
*/
@@ -79,63 +59,70 @@
"Must initialize MetadataRecordTypes before initializing MetadataPrimaryIndexes");
}
- DATAVERSE_DATASET = new MetadataIndex("Dataverse", null, 2, new IAType[] { BuiltinType.ASTRING },
- (Arrays.asList(Arrays.asList("DataverseName"))), 0, MetadataRecordTypes.DATAVERSE_RECORDTYPE,
- DATAVERSE_DATASET_ID, true, new int[] { 0 });
+ DATAVERSE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATAVERSE, 2,
+ new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"))), 0,
+ MetadataRecordTypes.DATAVERSE_RECORDTYPE, true, new int[] { 0 });
- DATASET_DATASET = new MetadataIndex("Dataset", null, 3,
- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(
- Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 0,
- MetadataRecordTypes.DATASET_RECORDTYPE, DATASET_DATASET_ID, true, new int[] { 0, 1 });
+ DATASET_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATASET, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 0,
+ MetadataRecordTypes.DATASET_RECORDTYPE, true, new int[] { 0, 1 });
- DATATYPE_DATASET = new MetadataIndex("Datatype", null, 3, new IAType[] { BuiltinType.ASTRING,
- BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"))),
- 0, MetadataRecordTypes.DATATYPE_RECORDTYPE, DATATYPE_DATASET_ID, true, new int[] { 0, 1 });
+ DATATYPE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"))), 0,
+ MetadataRecordTypes.DATATYPE_RECORDTYPE, true, new int[] { 0, 1 });
- INDEX_DATASET = new MetadataIndex("Index", null, 4, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
- BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"),
- Arrays.asList("IndexName"))), 0, MetadataRecordTypes.INDEX_RECORDTYPE, INDEX_DATASET_ID, true,
- new int[] { 0, 1, 2 });
+ INDEX_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.INDEX, 4,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"),
+ Arrays.asList("IndexName"))),
+ 0, MetadataRecordTypes.INDEX_RECORDTYPE, true, new int[] { 0, 1, 2 });
- NODE_DATASET = new MetadataIndex("Node", null, 2, new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays
- .asList("NodeName"))), 0, MetadataRecordTypes.NODE_RECORDTYPE, NODE_DATASET_ID, true, new int[] { 0 });
+ NODE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.NODE, 2, new IAType[] { BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("NodeName"))), 0, MetadataRecordTypes.NODE_RECORDTYPE, true,
+ new int[] { 0 });
- NODEGROUP_DATASET = new MetadataIndex("Nodegroup", null, 2, new IAType[] { BuiltinType.ASTRING },
- (Arrays.asList(Arrays.asList("GroupName"))), 0, MetadataRecordTypes.NODEGROUP_RECORDTYPE,
- NODEGROUP_DATASET_ID, true, new int[] { 0 });
+ NODEGROUP_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.NODEGROUP, 2,
+ new IAType[] { BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("GroupName"))), 0,
+ MetadataRecordTypes.NODEGROUP_RECORDTYPE, true, new int[] { 0 });
- FUNCTION_DATASET = new MetadataIndex("Function", null, 4, new IAType[] { BuiltinType.ASTRING,
- BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"),
- Arrays.asList("Name"), Arrays.asList("Arity"))), 0, MetadataRecordTypes.FUNCTION_RECORDTYPE,
- FUNCTION_DATASET_ID, true, new int[] { 0, 1, 2 });
+ FUNCTION_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FUNCTION, 4,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"), Arrays.asList("Arity"))), 0,
+ MetadataRecordTypes.FUNCTION_RECORDTYPE, true, new int[] { 0, 1, 2 });
- DATASOURCE_ADAPTER_DATASET = new MetadataIndex("DatasourceAdapter", null, 3, new IAType[] {
- BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"),
- Arrays.asList("Name"))), 0, MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE,
- DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0, 1 });
-
- FEED_DATASET = new MetadataIndex("Feed", null, 3, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
- (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("FeedName"))), 0,
- MetadataRecordTypes.FEED_RECORDTYPE, FEED_DATASET_ID, true, new int[] { 0, 1 });
-
- LIBRARY_DATASET = new MetadataIndex("Library", null, 3,
- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(
- Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0,
- MetadataRecordTypes.LIBRARY_RECORDTYPE, LIBRARY_DATASET_ID, true, new int[] { 0, 1 });
-
- FEED_POLICY_DATASET = new MetadataIndex("FeedPolicy", null, 3, new IAType[] { BuiltinType.ASTRING,
- BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("PolicyName"))), 0,
- MetadataRecordTypes.FEED_POLICY_RECORDTYPE, FEED_POLICY_DATASET_ID, true, new int[] { 0, 1 });
-
- COMPACTION_POLICY_DATASET = new MetadataIndex("CompactionPolicy", null, 3, new IAType[] { BuiltinType.ASTRING,
- BuiltinType.ASTRING },
- (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("CompactionPolicy"))), 0,
- MetadataRecordTypes.COMPACTION_POLICY_RECORDTYPE, COMPACTION_POLICY_DATASET_ID, true,
+ DATASOURCE_ADAPTER_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.DATASOURCE_ADAPTER, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0,
+ MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, true,
new int[] { 0, 1 });
- EXTERNAL_FILE_DATASET = new MetadataIndex("ExternalFile", null, 4, new IAType[] { BuiltinType.ASTRING,
- BuiltinType.ASTRING, BuiltinType.AINT32 }, (Arrays.asList(Arrays.asList("DataverseName"),
- Arrays.asList("DatasetName"), Arrays.asList("FileNumber"))), 0,
- MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, EXTERNAL_FILE_DATASET_ID, true, new int[] { 0, 1, 2 });
+ FEED_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FEED, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("FeedName"))), 0,
+ MetadataRecordTypes.FEED_RECORDTYPE, true, new int[] { 0, 1 });
+
+ LIBRARY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.LIBRARY, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("Name"))), 0,
+ MetadataRecordTypes.LIBRARY_RECORDTYPE, true, new int[] { 0, 1 });
+
+ FEED_POLICY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.FEED_POLICY, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("PolicyName"))), 0,
+ MetadataRecordTypes.FEED_POLICY_RECORDTYPE, true, new int[] { 0, 1 });
+
+ COMPACTION_POLICY_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.COMPACTION_POLICY, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("CompactionPolicy"))), 0,
+ MetadataRecordTypes.COMPACTION_POLICY_RECORDTYPE, true,
+ new int[] { 0, 1 });
+
+ EXTERNAL_FILE_DATASET = new MetadataIndex(MetadataIndexImmutableProperties.EXTERNAL_FILE, 4,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatasetName"),
+ Arrays.asList("FileNumber"))),
+ 0, MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, true, new int[] { 0, 1, 2 });
}
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
index 651021c..fbe339f 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataSecondaryIndexes.java
@@ -37,7 +37,6 @@
/**
* Create all metadata secondary index descriptors. MetadataRecordTypes must
* have been initialized before calling this init.
- *
* @throws MetadataException
* If MetadataRecordTypes have not been initialized.
*/
@@ -48,21 +47,23 @@
"Must initialize MetadataRecordTypes before initializing MetadataSecondaryIndexes.");
}
- GROUPNAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "GroupName", 3, new IAType[] { BuiltinType.ASTRING,
- BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(Arrays.asList("GroupName"),
- Arrays.asList("DataverseName"), Arrays.asList("DatasetName"))), 1, null,
- MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 1, 2 });
+ GROUPNAME_ON_DATASET_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.GROUPNAME_ON_DATASET, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("GroupName"), Arrays.asList("DataverseName"),
+ Arrays.asList("DatasetName"))),
+ 1, null, false, new int[] { 1, 2 });
- DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex("Dataset", "DatatypeName", 3, new IAType[] {
- BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, (Arrays.asList(
- Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"), Arrays.asList("DatasetName"))), 2, null,
- MetadataPrimaryIndexes.DATASET_DATASET_ID, false, new int[] { 0, 2 });
+ DATATYPENAME_ON_DATASET_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE_NAME_ON_DATASET, 3,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ (Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("DatatypeName"),
+ Arrays.asList("DatasetName"))),
+ 2, null, false, new int[] { 0, 2 });
- DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex("Datatype", "DatatypeName", 3, new IAType[] {
- BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ DATATYPENAME_ON_DATATYPE_INDEX = new MetadataIndex(MetadataIndexImmutableProperties.DATATYPE_NAME_ON_DATATYPE,
+ 3, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
(Arrays.asList(Arrays.asList("DataverseName"), Arrays.asList("NestedDatatypeName"),
- Arrays.asList("TopDatatypeName"))), 2, null, MetadataPrimaryIndexes.DATATYPE_DATASET_ID, false,
- new int[] { 0, 2 });
+ Arrays.asList("TopDatatypeName"))),
+ 2, null, false, new int[] { 0, 2 });
}
}
\ No newline at end of file
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 438805a..f2482da 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -185,4 +185,16 @@
public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
this.globalRecoveryCompleted = globalRecoveryCompleted;
}
+
+ public static boolean isClusterActive() {
+ if (AsterixClusterProperties.INSTANCE.getCluster() == null) {
+ //this is a virtual cluster
+ return true;
+ }
+ return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
+ }
+
+ public static int getNumberOfNodes(){
+ return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
+ }
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
new file mode 100644
index 0000000..ca7ba51
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
+import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.application.IApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+/**
+ * A resource id factory that generates unique resource ids across all NCs by requesting unique ids from the cluster controller.
+ */
+public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
+
+ private final IApplicationContext appCtx;
+ private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+
+ public GlobalResourceIdFactory(IApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ this.resourceIdResponseQ = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public long createId() throws HyracksDataException {
+ try {
+ ResourceIdRequestResponseMessage reponse = null;
+ //if there already exists a response, use it
+ if (resourceIdResponseQ.size() > 0) {
+ synchronized (resourceIdResponseQ) {
+ if (resourceIdResponseQ.size() > 0) {
+ reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+ }
+ }
+ }
+ //if no response available or it has an exception, request a new one
+ if (reponse == null || reponse.getException() != null) {
+ ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
+ ((INCMessageBroker) appCtx.getMessageBroker()).sendMessage(msg, this);
+ reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+ if (reponse.getException() != null) {
+ throw new HyracksDataException(reponse.getException().getMessage());
+ }
+ }
+ return reponse.getResourceId();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void deliverMessageResponse(IApplicationMessage message) {
+ resourceIdResponseQ.offer(message);
+ }
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
new file mode 100644
index 0000000..ec42139
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import org.apache.hyracks.api.application.IApplicationContext;
+
+public class GlobalResourceIdFactoryProvider {
+
+ private final IApplicationContext appCtx;
+
+ public GlobalResourceIdFactoryProvider(IApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+
+ public GlobalResourceIdFactory createResourceIdFactory() {
+ return new GlobalResourceIdFactory(appCtx);
+ }
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 1686e17..01a451c 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -28,7 +28,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
ILSMIOOperationSchedulerProvider {
@@ -71,7 +71,7 @@
}
@Override
- public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+ public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
.getResourceIdFactory();
}