Asterix MessageBroker implementation
This change includes the following:
- Add implementation for CC/NC MessageBroker.
- Implement GlobalResourceIdFactory using MessageBroker.
- Change resource id factory to GlobalResourceIdFactory.
- Refactor metadata indexes fixed properties.
- Use fixed resource ids for metadata indexes.
Change-Id: If4320e2c5a0130d2f86a4be6ae61f5cee43e30af
Reviewed-on: https://asterix-gerrit.ics.uci.edu/486
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-common/pom.xml b/asterix-common/pom.xml
index 512502b..02f651f 100644
--- a/asterix-common/pom.xml
+++ b/asterix-common/pom.xml
@@ -215,7 +215,15 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.2.0</version>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 94f5b2f..cd829e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -39,6 +39,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
public interface IAsterixAppRuntimeContext {
@@ -65,7 +66,7 @@
public IDatasetLifecycleManager getDatasetLifecycleManager();
- public ResourceIdFactory getResourceIdFactory();
+ public IResourceIdFactory getResourceIdFactory();
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
new file mode 100644
index 0000000..fbb9b86
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+
+public abstract class AbstractApplicationMessage implements IApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ private long id;
+
+ @Override
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public long getId() {
+ return id;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
new file mode 100644
index 0000000..a2b94a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ public long maxResourceId;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_RESPONSE;
+ }
+
+ public long getMaxResourceId() {
+ return maxResourceId;
+ }
+
+ public void setMaxResourceId(long maxResourceId) {
+ this.maxResourceId = maxResourceId;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
new file mode 100644
index 0000000..d2837ce
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+ public long maxResourceId;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_REQUEST;
+ }
+
+ public long getMaxResourceId() {
+ return maxResourceId;
+ }
+
+ public void setMaxResourceId(long maxResourceId) {
+ this.maxResourceId = maxResourceId;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
new file mode 100644
index 0000000..daeb9c4
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.RESOURCE_ID_REQUEST;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
new file mode 100644
index 0000000..09c50d3
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
+ private static final long serialVersionUID = 1L;
+
+ private long resourceId;
+ private Exception exception;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.RESOURCE_ID_RESPONSE;
+ }
+
+ public long getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(long resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public void setException(Exception exception) {
+ this.exception = exception;
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
new file mode 100644
index 0000000..61ab7cd
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessage;
+
+public interface IApplicationMessage extends IMessage {
+
+ public enum ApplicationMessageType {
+ RESOURCE_ID_REQUEST,
+ RESOURCE_ID_RESPONSE,
+ REPORT_MAX_RESOURCE_ID_REQUEST,
+ REPORT_MAX_RESOURCE_ID_RESPONSE
+ }
+
+ public abstract ApplicationMessageType getMessageType();
+
+ /**
+ * Sets a unique message id that identifies this message within an NC.
+ * This id is set by {@link INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+ * when the callback is not null to notify the sender when the response to that message is received.
+ *
+ * @param messageId
+ */
+ public void setId(long messageId);
+
+ /**
+ * @return The unique message id if it has been set, otherwise 0.
+ */
+ public long getId();
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
new file mode 100644
index 0000000..3bad5fb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+public interface IApplicationMessageCallback {
+
+ /**
+ * Notifies the message sender when the response has been received.
+ *
+ * @param message
+ * The response message
+ */
+ public void deliverMessageResponse(IApplicationMessage message);
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
new file mode 100644
index 0000000..3ff83b6
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface INCMessageBroker extends IMessageBroker {
+
+ /**
+ * Sends application message from this NC to the CC.
+ *
+ * @param message
+ * @param callback
+ * @throws Exception
+ */
+ public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+
+ /**
+ * Sends the maximum resource id on this NC to the CC.
+ *
+ * @throws Exception
+ */
+ public void reportMaxResourceId() throws Exception;
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index d308564..6382af9 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -30,7 +30,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public interface IAsterixAppRuntimeContextProvider {
@@ -52,7 +52,7 @@
public ILocalResourceRepository getLocalResourceRepository();
- public ResourceIdFactory getResourceIdFactory();
+ public IResourceIdFactory getResourceIdFactory();
public IIOManager getIOManager();