[NO ISSUE][HYR] Binary compatibility enhancements
Infrastructure & changes to enable binary compatibility with 0.9.4
Change-Id: I77d4919be4853d9afe9b0137861cff3b1d751e20
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3128
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index d89004b..3e72b7d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -78,8 +78,8 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
@@ -435,11 +435,18 @@
if (metadataNodeStub == null) {
final INetworkSecurityManager networkSecurityManager =
ncServiceContext.getControllerService().getNetworkSecurityManager();
- final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
- final RMIClientFactory clientSocketFactory =
- new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled());
- metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
- getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
+
+ // clients need to have the client factory on their classpath- to enable older clients, only use
+ // our client socket factory when SSL is enabled
+ if (networkSecurityManager.getConfiguration().isSslEnabled()) {
+ final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
+ final RMIClientFactory clientSocketFactory = new RMIClientFactory(true);
+ metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+ getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
+ } else {
+ metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+ getMetadataProperties().getMetadataPort());
+ }
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 4fa86ae..97316d2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -64,7 +64,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -132,10 +131,10 @@
}
runtimeContext.initialize(getRecoveryManagerFactory(), runtimeContext.getNodeProperties().isInitialRun());
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
- IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
+ NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
MessagingChannelInterfaceFactory interfaceFactory =
- new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
+ new MessagingChannelInterfaceFactory(messageBroker, messagingProperties);
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest();
if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 4c971e2..d6af749 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -43,11 +43,18 @@
public static IAsterixStateProxy registerRemoteObject(INetworkSecurityManager networkSecurityManager,
int metadataCallbackPort) throws RemoteException {
- final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
- final RMIClientFactory clientSocketFactory =
- new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled());
- final IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort,
- clientSocketFactory, serverSocketFactory);
+ IAsterixStateProxy stub;
+ // clients need to have the client factory on their classpath- to enable older clients, only use
+ // our client socket factory when SSL is enabled
+ if (networkSecurityManager.getConfiguration().isSslEnabled()) {
+ final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
+ final RMIClientFactory clientSocketFactory =
+ new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled());
+ stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort, clientSocketFactory,
+ serverSocketFactory);
+ } else {
+ stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort);
+ }
LOGGER.info("Asterix Distributed State Proxy Bound");
return stub;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
index 6e73a40..779fba2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.aggregates.std;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -31,7 +32,17 @@
public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public static final IFunctionDescriptorFactory FACTORY = SqlSumAggregateDescriptor::new;
+
+ // this must remain an anonymous inner class due to the evaluator factory below being an anonymous inner
+ // serializable class, to not break binary compatibility
+ // this can be reverted once serialization compatibility code is in place to write the correct class
+ @SuppressWarnings({ "Anonymous2MethodRef", "Convert2Lambda" })
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlSumAggregateDescriptor();
+ }
+ };
@Override
public FunctionIdentifier getIdentifier() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
similarity index 97%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
index 7d0dd61..c0445a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.client.impl;
+package org.apache.hyracks.api.client;
import java.io.Serializable;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
similarity index 99%
rename from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index a61c96d..72bdc3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.ipc.impl;
+package org.apache.hyracks.api.client;
import java.io.Serializable;
import java.net.URL;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index e92db5e..4cc47d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java
new file mode 100644
index 0000000..deaa966
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+public interface IJavaSerializationProvider {
+ default ObjectOutputStream newObjectOutputStream(OutputStream out) throws IOException {
+ return new ObjectOutputStream(out);
+ }
+
+ default ObjectInputStream newObjectInputStream(InputStream in) throws IOException {
+ return new ObjectInputStream(in);
+ }
+
+ default void readObject(ObjectInputStream in, Object object) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
index 5f11214..c66b4fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
@@ -65,7 +65,7 @@
}
default String ini() {
- return name().toLowerCase().replace("_", ".");
+ return toIni(name());
}
default String camelCase() {
@@ -75,4 +75,12 @@
default String toIniString() {
return "[" + section().sectionName() + "] " + ini();
}
+
+ static String toIni(String name) {
+ return name.toLowerCase().replace("_", ".");
+ }
+
+ default SerializedOption toSerializable() {
+ return new SerializedOption(this);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java
new file mode 100644
index 0000000..8263af6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public final class SerializedOption implements Serializable, Comparable {
+ private static final long serialVersionUID = 1L;
+ private final String sectionName;
+ private final String optionName;
+
+ SerializedOption(IOption option) {
+ this.sectionName = option.section().name();
+ this.optionName = option.name();
+ }
+
+ public String optionName() {
+ return optionName;
+ }
+
+ public Section section() {
+ return Section.valueOf(sectionName);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (!(o instanceof SerializedOption)) {
+ return -1;
+ }
+ SerializedOption that = (SerializedOption) o;
+ int sectionComp = sectionName.compareTo(that.sectionName);
+ return sectionComp != 0 ? sectionComp : optionName.compareTo(that.optionName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SerializedOption that = (SerializedOption) o;
+ return Objects.equals(sectionName, that.sectionName) && Objects.equals(optionName, that.optionName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sectionName, optionName);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + sectionName + "] " + optionName;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
index c3da155..83e0482 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
@@ -22,7 +22,7 @@
import java.util.Map;
import java.util.Set;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.topology.ClusterTopology;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
index a92e700..b38d343 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
@@ -29,20 +29,25 @@
import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
+import org.apache.hyracks.api.comm.IJavaSerializationProvider;
+
public class JavaSerializationUtils {
+ private static IJavaSerializationProvider serProvider = new IJavaSerializationProvider() {
+ };
+
public static byte[] serialize(Serializable jobSpec) throws IOException {
if (jobSpec instanceof byte[]) {
return (byte[]) jobSpec;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
+ ObjectOutputStream oos = serProvider.newObjectOutputStream(baos);
oos.writeObject(jobSpec);
return baos.toByteArray();
}
public static byte[] serialize(Serializable jobSpec, ClassLoader classLoader) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
+ ObjectOutputStream oos = serProvider.newObjectOutputStream(baos);
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(classLoader);
@@ -57,7 +62,7 @@
if (bytes == null) {
return null;
}
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+ ObjectInputStream ois = serProvider.newObjectInputStream(new ByteArrayInputStream(bytes));
return ois.readObject();
}
@@ -78,6 +83,18 @@
return Class.forName(className);
}
+ public static void setSerializationProvider(IJavaSerializationProvider serProvider) {
+ JavaSerializationUtils.serProvider = serProvider;
+ }
+
+ public static IJavaSerializationProvider getSerializationProvider() {
+ return serProvider;
+ }
+
+ public static void readObject(ObjectInputStream in, Object object) throws IOException, ClassNotFoundException {
+ serProvider.readObject(in, object);
+ }
+
private static class ClassLoaderObjectInputStream extends ObjectInputStream {
private ClassLoader classLoader;
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
index e802ef9..6260dd6 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.RPCInterface;
-import org.apache.hyracks.ipc.impl.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
public class ResultDirectoryRemoteProxy implements IResultDirectory {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index f2ea988..deb6785 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -44,7 +44,7 @@
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.hyracks.ipc.impl.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c2e7b22..419dff6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -36,7 +36,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.application.ICCApplication;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.context.ICCContext;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 4f76ced..3e72942 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -46,6 +46,7 @@
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.util.annotations.Idempotent;
import org.apache.hyracks.util.annotations.NotThreadSafe;
@@ -218,7 +219,7 @@
}
private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
- String ipAddress = ncState.getNCConfig().getDataPublicAddress();
+ String ipAddress = (String) ncState.getConfig().get(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable());
try {
return InetAddress.getByName(ipAddress);
} catch (UnknownHostException e) {
@@ -237,8 +238,8 @@
state.getNodeController().shutdown(false);
LOGGER.warn("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication",
nodeId);
- } catch (Exception ignore) {
- LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore);
+ } catch (Exception ex) {
+ LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ex);
}
});
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index aa7dca1..30115ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -37,7 +37,6 @@
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.config.ConfigUtils;
import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.work.IPCResponder;
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.util.MXHelper;
@@ -89,10 +88,7 @@
if (ncs != null) {
detail = ncs.toDetailedJSON(includeStats, includeConfig);
if (includeConfig) {
- final NCConfig ncConfig = ncs.getNCConfig();
- ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId),
- NC_SECTIONS);
- detail.putPOJO("app.args", ncConfig.getAppArgs());
+ ConfigUtils.addConfigToJSON(detail, ncs.getConfig(), ccConfig.getConfigManager(), NC_SECTIONS);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index fe33bc9..0ea9239 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -21,11 +21,11 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
@@ -62,11 +62,16 @@
try {
NodeControllerState state = new NodeControllerState(nc, reg);
nodeManager.addNode(id, state);
- IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
final Map<IOption, Object> ncConfiguration = new HashMap<>();
- for (IOption option : cfg.getOptions()) {
- ncConfiguration.put(option, cfg.get(option));
- }
+ ConfigManager configManager = ccs.getConfig().getConfigManager();
+ state.getConfig().forEach((key, value) -> {
+ IOption option = configManager.lookupOption(key);
+ if (option == null) {
+ LOGGER.info("discarding unknown option {}", key);
+ } else {
+ ncConfiguration.put(option, value);
+ }
+ });
LOGGER.info("registered node: {}", id);
nc.sendRegistrationResult(params, null);
ccs.getContext().notifyNodeJoin(id, ncConfiguration);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 9d755a0..d92727c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -22,6 +22,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.concurrent.Executors;
import org.apache.hyracks.api.comm.NetworkAddress;
@@ -174,9 +175,8 @@
when(ncState.getDataPort()).thenReturn(dataAddr);
when(ncState.getResultPort()).thenReturn(resultAddr);
when(ncState.getMessagingPort()).thenReturn(msgAddr);
- NCConfig ncConfig = new NCConfig(nodeId);
- ncConfig.setDataPublicAddress(ipAddr);
- when(ncState.getNCConfig()).thenReturn(ncConfig);
+ when(ncState.getConfig())
+ .thenReturn(Collections.singletonMap(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable(), ipAddr));
Mockito.when(ncState.getNodeController()).thenReturn(ncProxy);
return ncState;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
index 1926da6..c37acab 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
@@ -21,6 +21,7 @@
import static org.apache.hyracks.control.common.utils.ConfigurationUtil.toPathElements;
import static org.apache.hyracks.util.JSONUtil.put;
+import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -28,9 +29,9 @@
import java.util.Set;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.SerializedOption;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
@@ -42,7 +43,9 @@
private static final int RRD_SIZE = 720;
- private final NCConfig ncConfig;
+ private final String nodeId;
+
+ private final Map<SerializedOption, Object> config;
private final NetworkAddress dataPort;
@@ -145,7 +148,9 @@
private NodeCapacity capacity;
public NodeControllerData(NodeRegistration reg) {
- ncConfig = reg.getNCConfig();
+ nodeId = reg.getNodeId();
+ config = Collections.unmodifiableMap(reg.getConfig());
+
dataPort = reg.getDataPort();
resultPort = reg.getResultPort();
messagingPort = reg.getMessagingPort();
@@ -252,8 +257,8 @@
return System.nanoTime() - lastHeartbeatNanoTime;
}
- public NCConfig getNCConfig() {
- return ncConfig;
+ public Map<SerializedOption, Object> getConfig() {
+ return config;
}
public Set<JobId> getActiveJobIds() {
@@ -279,7 +284,7 @@
public synchronized ObjectNode toSummaryJSON() {
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
- put(o, "node-id", ncConfig.getNodeId());
+ put(o, "node-id", nodeId);
put(o, "heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
put(o, "system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
@@ -290,7 +295,7 @@
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
- put(o, "node-id", ncConfig.getNodeId());
+ put(o, "node-id", nodeId);
if (includeConfig) {
put(o, "os-name", osName);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 1dae48c..44fa0bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -52,6 +52,7 @@
import org.apache.hyracks.api.config.IConfigurator;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.config.SerializedOption;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
import org.apache.logging.log4j.Level;
@@ -210,6 +211,10 @@
this.versionString = versionString;
}
+ public IOption lookupOption(SerializedOption option) {
+ return lookupOption(option.section().sectionName(), IOption.toIni(option.optionName()));
+ }
+
public IOption lookupOption(String section, String key) {
Map<String, IOption> map = getSectionOptionMap(Section.parseSectionName(section));
return map == null ? null : map.get(key);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
index 4fa9b56..1cc739c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
@@ -34,6 +34,8 @@
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.config.SerializedOption;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.ini4j.Ini;
import org.kohsuke.args4j.CmdLineException;
@@ -150,23 +152,21 @@
return value;
}
- public static String getString(Ini ini, org.apache.hyracks.api.config.Section section, IOption option,
- String defaultValue) {
+ public static String getString(Ini ini, Section section, IOption option, String defaultValue) {
return getString(ini, section.sectionName(), option.ini(), defaultValue);
}
- public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg,
- org.apache.hyracks.api.config.Section... sections) {
+ public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg, Section... sections) {
ArrayNode configArray = o.putArray("config");
- for (org.apache.hyracks.api.config.Section section : cfg.getSections(Arrays.asList(sections)::contains)) {
+ for (Section section : cfg.getSections(Arrays.asList(sections)::contains)) {
ObjectNode sectionNode = configArray.addObject();
Map<String, Object> sectionConfig = getSectionOptionsForJSON(cfg, section, option -> true);
sectionNode.put("section", section.sectionName()).putPOJO("properties", sectionConfig);
}
}
- public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg,
- org.apache.hyracks.api.config.Section section, Predicate<IOption> selector) {
+ public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg, Section section,
+ Predicate<IOption> selector) {
Map<String, Object> sectionConfig = new TreeMap<>();
for (IOption option : cfg.getOptions(section)) {
if (selector.test(option)) {
@@ -175,4 +175,28 @@
}
return sectionConfig;
}
+
+ public static void addConfigToJSON(ObjectNode o, Map<SerializedOption, Object> config, ConfigManager configManager,
+ Section... sections) {
+ ArrayNode configArray = o.putArray("config");
+ for (Section section : sections) {
+ ObjectNode sectionNode = configArray.addObject();
+ Map<String, Object> sectionConfig = new TreeMap<>();
+ config.entrySet().stream().filter(e -> e.getKey().section().equals(section)).forEach(entry -> sectionConfig
+ .put(IOption.toIni(entry.getKey().optionName()), fixupValueForJSON(entry, configManager)));
+ if (!sectionConfig.isEmpty()) {
+ sectionNode.put("section", section.sectionName()).putPOJO("properties", sectionConfig);
+ }
+ }
+ }
+
+ private static Object fixupValueForJSON(Map.Entry<SerializedOption, Object> entry, ConfigManager configManager) {
+ IOption option = configManager.lookupOption(entry.getKey());
+ if (option != null) {
+ // use the type system for the option to serialize this
+ return option.type().serializeToJSON(entry.getValue());
+ }
+ // not much we can do, let default JSON serialization do its thing
+ return entry.getValue();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index d41350f..acfa394 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,7 +18,6 @@
*/
package org.apache.hyracks.control.common.controllers;
-import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
@@ -260,10 +259,18 @@
}
public NCConfig(String nodeId, ConfigManager configManager) {
+ this(nodeId, configManager, true);
+ }
+
+ public NCConfig(String nodeId, ConfigManager configManager, boolean selfRegister) {
super(configManager);
this.appConfig = nodeId == null ? configManager.getAppConfig() : configManager.getNodeEffectiveConfig(nodeId);
- configManager.register(Option.class);
- configManager.register(ControllerConfig.Option.class);
+ if (selfRegister) {
+ configManager.register(Option.class);
+ configManager.register(ControllerConfig.Option.class);
+ } else {
+ configManager.register(Option.NODE_ID);
+ }
setNodeId(nodeId);
this.nodeId = nodeId;
configManager.registerArgsListener(appArgs::addAll);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
index d9165e1..e78a423 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
@@ -20,7 +20,7 @@
import java.io.Serializable;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
public class NodeParameters implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index b4d835d..f76d9b8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -21,13 +21,20 @@
import static org.apache.hyracks.util.MXHelper.osMXBean;
import static org.apache.hyracks.util.MXHelper.runtimeMXBean;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.SerializedOption;
import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
import org.apache.hyracks.util.MXHelper;
import org.apache.hyracks.util.PidHelper;
@@ -39,6 +46,8 @@
private final String nodeId;
+ @Deprecated // required for binary backward-compatibility when registering with a 0.9.4 CC
+ @SuppressWarnings("unused")
private final NCConfig ncConfig;
private final NetworkAddress dataPort;
@@ -77,11 +86,12 @@
private final NodeCapacity capacity;
+ private final HashMap<SerializedOption, Object> config;
+
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
NetworkAddress resultPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
- this.ncConfig = ncConfig;
this.dataPort = dataPort;
this.resultPort = resultPort;
this.hbSchema = hbSchema;
@@ -100,6 +110,12 @@
this.inputArguments = runtimeMXBean.getInputArguments();
this.systemProperties = runtimeMXBean.getSystemProperties();
this.pid = PidHelper.getPid();
+ IApplicationConfig cfg = ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId);
+ this.config = new HashMap<>();
+ for (IOption option : cfg.getOptions()) {
+ config.put(option.toSerializable(), cfg.get(option));
+ }
+ this.ncConfig = null;
}
public InetSocketAddress getNodeControllerAddress() {
@@ -114,8 +130,8 @@
return capacity;
}
- public NCConfig getNCConfig() {
- return ncConfig;
+ public Map<SerializedOption, Object> getConfig() {
+ return config;
}
public NetworkAddress getDataPort() {
@@ -185,4 +201,9 @@
public int getPid() {
return pid;
}
-}
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ JavaSerializationUtils.readObject(in, this);
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
index 3fac7da..4f27ec1 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
@@ -24,9 +24,10 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
import org.apache.hyracks.api.client.IHyracksClientInterface;
import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index 2c7e82e..faae14b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -40,7 +40,7 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.IHyracksClientInterface;
import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.deployment.DeploymentId;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
index c4263d2..439f230 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -18,8 +18,11 @@
*/
package org.apache.hyracks.ipc.impl;
+import static org.apache.hyracks.api.util.JavaSerializationUtils.getSerializationProvider;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
@@ -28,6 +31,7 @@
import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
+
@Override
public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
return deserialize(buffer, length);
@@ -48,21 +52,23 @@
return serialize(exception);
}
- public static void serialize(OutputStream out, Object object) throws Exception {
- ObjectOutputStream oos = new ObjectOutputStream(out);
- oos.writeObject(object);
- oos.flush();
+ public static void serialize(OutputStream out, Object object) throws IOException {
+ try (ObjectOutputStream oos = getSerializationProvider().newObjectOutputStream(out)) {
+ oos.writeObject(object);
+ oos.flush();
+ }
}
private Object deserialize(ByteBuffer buffer, int length) throws Exception {
- ObjectInputStream ois =
- new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(), length));
- Object object = ois.readObject();
- ois.close();
+ Object object;
+ try (ObjectInputStream ois = getSerializationProvider()
+ .newObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(), length))) {
+ object = ois.readObject();
+ }
return object;
}
- private byte[] serialize(Object object) throws Exception {
+ private byte[] serialize(Object object) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serialize(baos, object);
baos.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 7d5beff..cdd3ea2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,7 @@
import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.util.CompatibilityUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -133,4 +135,15 @@
json.putPOJO("btreeFields", btreeFields);
json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry));
}
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // compat w/ 0.3.4
+ if (compressorDecompressorFactory == null) {
+ CompatibilityUtil.writeField(this, "compressorDecompressorFactory",
+ NoOpCompressorDecompressorFactory.INSTANCE);
+ }
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index ea41c3d..740b3d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+import java.io.IOException;
import java.util.Map;
import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
@@ -33,6 +34,8 @@
import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.util.CompatibilityUtil;
public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
@@ -69,4 +72,14 @@
filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory,
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable, compressorDecompressorFactory);
}
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // compat w/ 0.3.4
+ if (compressorDecompressorFactory == null) {
+ CompatibilityUtil.writeField(this, "compressorDecompressorFactory",
+ NoOpCompressorDecompressorFactory.INSTANCE);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
index 690f4a2..90a4b56 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.storage.common.compression;
+import java.io.ObjectStreamException;
+
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -26,7 +28,7 @@
import com.fasterxml.jackson.databind.JsonNode;
-public class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory {
+public final class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory {
private static final long serialVersionUID = 1L;
public static final ICompressorDecompressorFactory INSTANCE = new NoOpCompressorDecompressorFactory();
@@ -44,4 +46,8 @@
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
return INSTANCE;
}
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java
new file mode 100644
index 0000000..392955a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CompatibilityUtil {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private CompatibilityUtil() {
+ }
+
+ public static Object readField(Object obj, String fieldName) throws IOException {
+ Class<?> objClass = obj.getClass();
+ LOGGER.debug("reading field '{}' on object of type {}", fieldName, objClass);
+ try {
+ Field f = objClass.getDeclaredField(fieldName);
+ f.setAccessible(true);
+ return f.get(obj);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ LOGGER.warn("exception reading field '{}' on object of type {}", fieldName, objClass, e);
+ throw new IOException(e);
+ }
+ }
+
+ public static void writeField(Object obj, String fieldName, Object newValue) throws IOException {
+ Class<?> objClass = obj.getClass();
+ LOGGER.debug("updating field '{}' on object of type {} to {}", fieldName, objClass, newValue);
+ try {
+ Field f = objClass.getDeclaredField(fieldName);
+ f.setAccessible(true);
+ f.set(obj, newValue);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ LOGGER.warn("exception updating field '{}' object of type {} to {}", fieldName, objClass, newValue, e);
+ throw new IOException(e);
+ }
+ }
+}