Clean up compilation warnings.
Change-Id: Idbfcd9c67f91d373c5f7269125778a5681021227
Reviewed-on: https://asterix-gerrit.ics.uci.edu/505
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 6f3078e..92cd61a 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -42,7 +42,6 @@
private final static long SEED = 0L;
- private final IHyracksTaskContext hyracksTaskCtx;
private final ITransactionManager transactionManager;
private final ILogManager logMgr;
private final JobId jobId;
@@ -59,7 +58,6 @@
public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction) {
- this.hyracksTaskCtx = ctx;
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 3e2d26d..0a66013 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -21,11 +21,9 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
-import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.asterix.metadata.declared.AqlSourceId;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -35,8 +33,8 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
public class AnalysisUtil {
/*
@@ -133,6 +131,7 @@
}
private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<FunctionIdentifier>();
+
static {
fieldAccessFunctions.add(AsterixBuiltinFunctions.GET_DATA);
fieldAccessFunctions.add(AsterixBuiltinFunctions.GET_HANDLE);
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index b118fb7..35efcb1 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -128,8 +128,8 @@
String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
IFeedLoadManager loadManager = CentralFeedManager.getInstance().getFeedLoadManager();
- FeedConnectionId connectionId = new FeedConnectionId(new FeedId(activity.getDataverseName(),
- activity.getFeedName()), activity.getDatasetName());
+ FeedConnectionId connectionId = new FeedConnectionId(
+ new FeedId(activity.getDataverseName(), activity.getFeedName()), activity.getDatasetName());
int intakeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.COLLECT) * intake.split(",").length;
int storeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.STORE) * store.split(",").length;
@@ -158,13 +158,7 @@
html.append("</tr>");
}
- private String insertLink(StringBuilder html, String url, String displayText) {
- return ("<a href=\"" + url + "\">" + displayText + "</a>");
- }
-
private String insertColoredText(String s, String color) {
return "<font color=\"" + color + "\">" + s + "</font>";
}
}
-
-
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
index fe20f51..f1473f4 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
@@ -59,6 +59,8 @@
ch = (char) in.read();
}
buffer.flip();
+ sc.close();
+
String s = new String(buffer.array());
int feedSubscriptionPort = Integer.parseInt(s.trim());
if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
index f5b5cbb..7585fd3 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
@@ -30,11 +30,9 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.io.IOUtils;
-
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.result.ResultUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.hyracks.api.client.IHyracksClientConnection;
public class ShutdownAPIServlet extends HttpServlet {
@@ -42,23 +40,14 @@
private static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
- private static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
-
@Override
- protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
- IOException {
+ protected void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
PrintWriter out = response.getWriter();
- OutputFormat format = OutputFormat.LOSSLESS_JSON;
- String accept = request.getHeader("Accept");
- if ((accept == null) || (accept.contains("application/x-adm"))) {
- format = OutputFormat.ADM;
- } else if (accept.contains("application/json")) {
- format = OutputFormat.LOSSLESS_JSON;
- }
StringWriter sw = new StringWriter();
IOUtils.copy(request.getInputStream(), sw, StandardCharsets.UTF_8.name());
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
index f98bb06..424421a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
@@ -18,24 +18,20 @@
*/
package org.apache.asterix.api.http.servlet;
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.common.config.AsterixBuildProperties;
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hadoop.http.HttpServer;
-import org.json.JSONObject;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Map;
+
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.json.JSONObject;
public class VersionAPIServlet extends HttpServlet {
-
+ private static final long serialVersionUID = 1L;
public static final String ASTERIX_BUILD_PROP_ATTR = "org.apache.asterix.PROPS";
@Override
@@ -45,7 +41,7 @@
Map<String, String> buildProperties = props.getBuildProperties().getAllProps();
JSONObject responseObject = new JSONObject(buildProperties);
response.setCharacterEncoding("utf-8");
- PrintWriter responseWriter = response.getWriter();
+ PrintWriter responseWriter = response.getWriter();
responseWriter.write(responseObject.toString());
response.setStatus(HttpServletResponse.SC_OK);
responseWriter.flush();
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
index 202451b..d53428d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
@@ -146,7 +146,7 @@
}
/*
- * Traverse job specification to categorize job as a feed intake job or a feed collection job
+ * Traverse job specification to categorize job as a feed intake job or a feed collection job
*/
@Override
public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
@@ -333,6 +333,8 @@
}
}
break;
+ default:
+ break;
}
}
@@ -473,10 +475,12 @@
return feedJobNotificationHandler.getFeedJoint(feedJointKey);
}
+ @Override
public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
feedJobNotificationHandler.registerFeedEventSubscriber(connectionId, subscriber);
}
+ @Override
public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
feedJobNotificationHandler.deregisterFeedEventSubscriber(connectionId, subscriber);
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
index 4acc1d4..81cabeb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
@@ -103,6 +103,8 @@
case STRING:
sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
break;
+ default:
+ break;
}
}
sb.append(" }");
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
index d3ee4ef..3686a03 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
@@ -100,8 +100,8 @@
}
}
if (unsubstitutedNodes.size() > 0) {
- String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(
- new String[] {});
+ String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes()
+ .toArray(new String[] {});
nodeIndex = 0;
for (String unsubstitutedNode : unsubstitutedNodes) {
nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
@@ -226,6 +226,8 @@
}
}
break;
+ default:
+ break;
}
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
index ff5e591..51d857a 100755
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -100,8 +100,8 @@
return uninstalledLibs;
}
- private static boolean uninstallLibrary(String dataverse, String libraryName) throws AsterixException,
- RemoteException, ACIDException {
+ private static boolean uninstallLibrary(String dataverse, String libraryName)
+ throws AsterixException, RemoteException, ACIDException {
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -120,8 +120,8 @@
.getDataverseFunctions(mdTxnCtx, dataverse);
for (org.apache.asterix.metadata.entities.Function function : functions) {
if (function.getName().startsWith(libraryName + "#")) {
- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, new FunctionSignature(dataverse,
- function.getName(), function.getArity()));
+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx,
+ new FunctionSignature(dataverse, function.getName(), function.getArity()));
}
}
@@ -156,8 +156,8 @@
MetadataManager.INSTANCE.acquireWriteLatch();
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- org.apache.asterix.metadata.entities.Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(
- mdTxnCtx, dataverse, libraryName);
+ org.apache.asterix.metadata.entities.Library libraryInMetadata = MetadataManager.INSTANCE
+ .getLibrary(mdTxnCtx, dataverse, libraryName);
if (libraryInMetadata != null && !wasUninstalled) {
return;
}
@@ -190,9 +190,9 @@
args.add(arg);
}
org.apache.asterix.metadata.entities.Function f = new org.apache.asterix.metadata.entities.Function(
- dataverse, libraryName + "#" + function.getName().trim(), args.size(), args, function
- .getReturnType().trim(), function.getDefinition().trim(), library.getLanguage()
- .trim(), function.getFunctionType().trim());
+ dataverse, libraryName + "#" + function.getName().trim(), args.size(), args,
+ function.getReturnType().trim(), function.getDefinition().trim(),
+ library.getLanguage().trim(), function.getFunctionType().trim());
MetadataManager.INSTANCE.addFunction(mdTxnCtx, f);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Installed function: " + libraryName + "#" + function.getName().trim());
@@ -221,8 +221,8 @@
LOGGER.info("Installed adapters contain in library :" + libraryName);
}
- MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new org.apache.asterix.metadata.entities.Library(dataverse,
- libraryName));
+ MetadataManager.INSTANCE.addLibrary(mdTxnCtx,
+ new org.apache.asterix.metadata.entities.Library(dataverse, libraryName));
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Added library " + libraryName + "to Metadata");
@@ -240,8 +240,8 @@
}
}
- private static void registerLibrary(String dataverse, String libraryName, boolean isMetadataNode, File installLibDir)
- throws Exception {
+ private static void registerLibrary(String dataverse, String libraryName, boolean isMetadataNode,
+ File installLibDir) throws Exception {
ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryName);
ExternalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader);
}
@@ -261,8 +261,10 @@
+ " Install Directory: " + installDir.getAbsolutePath());
}
- File libDir = new File(installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName);
+ File libDir = new File(
+ installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName);
FilenameFilter jarFileFilter = new FilenameFilter() {
+ @Override
public boolean accept(File dir, String name) {
return name.endsWith(".jar");
}
@@ -288,12 +290,12 @@
ClassLoader parentClassLoader = ExternalLibraryBootstrap.class.getClassLoader();
URL[] urls = new URL[numDependencies];
int count = 0;
- urls[count++] = libJar.toURL();
+ urls[count++] = libJar.toURI().toURL();
if (libraryDependencies != null && libraryDependencies.length > 0) {
for (String dependency : libraryDependencies) {
File file = new File(libDependencyDir + File.separator + dependency);
- urls[count++] = file.toURL();
+ urls[count++] = file.toURI().toURL();
}
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java b/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
index 391f61b..5ebbe2c 100644
--- a/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
+++ b/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
@@ -30,15 +30,11 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.http.ParseException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import org.apache.asterix.api.common.SessionConfig;
import org.apache.asterix.api.common.SessionConfig.OutputFormat;
import org.apache.asterix.api.http.servlet.APIServlet;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.http.ParseException;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -46,6 +42,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
public class ResultUtils {
private static final Charset UTF_8 = Charset.forName("UTF-8");
@@ -92,8 +91,7 @@
public static FrameManager resultDisplayFrameMgr = new FrameManager(ResultReader.FRAME_SIZE);
- public static void displayResults(ResultReader resultReader, SessionConfig conf)
- throws HyracksDataException {
+ public static void displayResults(ResultReader resultReader, SessionConfig conf) throws HyracksDataException {
IFrameTupleAccessor fta = resultReader.getFrameTupleAccessor();
IFrame frame = new VSizeFrame(resultDisplayFrameMgr);
@@ -107,8 +105,8 @@
// If we're outputting CSV with a header, the HTML header was already
// output by displayCSVHeader(), so skip it here
- if (conf.is(SessionConfig.FORMAT_HTML) &&
- ! (conf.fmt() == OutputFormat.CSV && conf.is(SessionConfig.FORMAT_CSV_HEADER))) {
+ if (conf.is(SessionConfig.FORMAT_HTML)
+ && !(conf.fmt() == OutputFormat.CSV && conf.is(SessionConfig.FORMAT_CSV_HEADER))) {
conf.out().println("<h4>Results:</h4>");
conf.out().println("<pre>");
}
@@ -125,6 +123,8 @@
wrap_array = true;
}
break;
+ default:
+ break;
}
if (bytesRead > 0) {
@@ -140,7 +140,7 @@
byte[] recordBytes = new byte[length];
int numread = bbis.read(recordBytes, 0, length);
if (conf.fmt() == OutputFormat.CSV) {
- if ( (numread > 0) && (recordBytes[numread-1] == '\n') ) {
+ if ((numread > 0) && (recordBytes[numread - 1] == '\n')) {
numread--;
}
}
@@ -270,7 +270,7 @@
//try returning the class without package qualification
String exceptionClassName = hierarchySplits[hierarchySplits.length - 1];
String localizedMessage = cause.getLocalizedMessage();
- if(localizedMessage == null){
+ if (localizedMessage == null) {
localizedMessage = "Internal error. Please check instance logs for further details.";
}
return localizedMessage + " [" + exceptionClassName + "]";
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
index 6d2de65..9961dc8 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
@@ -26,17 +26,13 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
-
/**
* Manages a Mini (local VM) HDFS cluster with a configured number of datanodes.
*
* @author ramangrover29
*/
-@SuppressWarnings("deprecation")
public class HDFSCluster {
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
@@ -68,7 +64,7 @@
conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
cleanupLocal();
- //this constructor is deprecated in hadoop 2x
+ //this constructor is deprecated in hadoop 2x
//dfsCluster = new MiniDFSCluster(nameNodePort, conf, numDataNodes, true, true, StartupOption.REGULAR, null);
MiniDFSCluster.Builder build = new MiniDFSCluster.Builder(conf);
build.nameNodePort(nameNodePort);
@@ -104,28 +100,4 @@
}
}
- public static void main(String[] args) throws Exception {
- HDFSCluster cluster = new HDFSCluster();
- cluster.setup();
- JobConf conf = configureJobConf();
- FileSystem fs = FileSystem.get(conf);
- InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
- for (InputSplit split : inputSplits) {
- System.out.println("split :" + split);
- }
- // cluster.cleanup();
- }
-
- private static JobConf configureJobConf() throws Exception {
- JobConf conf = new JobConf();
- String hdfsUrl = "hdfs://127.0.0.1:31888";
- String hdfsPath = "/asterix/extrasmalltweets.txt";
- conf.set("fs.default.name", hdfsUrl);
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set("mapred.input.dir", hdfsPath);
- conf.set("mapred.input.format.class", "org.apache.hadoop.mapred.TextInputFormat");
- return conf;
- }
-
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/annotations/RecordDataGenAnnotation.java b/asterix-common/src/main/java/org/apache/asterix/common/annotations/RecordDataGenAnnotation.java
index e22e85c..f478a4e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/annotations/RecordDataGenAnnotation.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/annotations/RecordDataGenAnnotation.java
@@ -21,7 +21,7 @@
import java.io.Serializable;
public class RecordDataGenAnnotation implements IRecordTypeAnnotation, Serializable {
-
+ private static final long serialVersionUID = 1L;
private final IRecordFieldDataGen[] declaredFieldsDatagen;
private final UndeclaredFieldsDataGen undeclaredFieldsDataGen;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/org/apache/asterix/common/api/AsterixThreadExecutor.java
index 24a260f..befeb48 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/AsterixThreadExecutor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/AsterixThreadExecutor.java
@@ -37,7 +37,7 @@
executorService.execute(command);
}
- public Future<Object> submit(Callable command) {
- return (Future<Object>) executorService.submit(command);
+ public Future<? extends Object> submit(Callable<? extends Object> command) {
+ return executorService.submit(command);
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixBuildProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixBuildProperties.java
index e065bec..3ff4ac3 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixBuildProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixBuildProperties.java
@@ -20,7 +20,6 @@
package org.apache.asterix.common.config;
import java.util.Map;
-import java.util.Properties;
public class AsterixBuildProperties extends AbstractAsterixProperties {
@@ -49,7 +48,8 @@
}
public String getCommitIdDescribeShort() {
- return accessor.getProperty("git.commit.id.describe-short", "", PropertyInterpreters.getStringPropertyInterpreter());
+ return accessor.getProperty("git.commit.id.describe-short", "",
+ PropertyInterpreters.getStringPropertyInterpreter());
}
public String getCommitUserEmail() {
@@ -69,7 +69,8 @@
}
public String getCommitMessageShort() {
- return accessor.getProperty("git.commit.message.short", "", PropertyInterpreters.getStringPropertyInterpreter());
+ return accessor.getProperty("git.commit.message.short", "",
+ PropertyInterpreters.getStringPropertyInterpreter());
}
public String getShortCommitId() {
@@ -85,7 +86,8 @@
}
public String getClosestTagCommitCount() {
- return accessor.getProperty("git.closest.tag.commit.count", "", PropertyInterpreters.getStringPropertyInterpreter());
+ return accessor.getProperty("git.closest.tag.commit.count", "",
+ PropertyInterpreters.getStringPropertyInterpreter());
}
public String getCommitIdDescribe() {
@@ -107,7 +109,8 @@
public String getCommitUserName() {
return accessor.getProperty("git.commit.user.name", "", PropertyInterpreters.getStringPropertyInterpreter());
}
- public Map<String, String> getAllProps(){
+
+ public Map<String, String> getAllProps() {
return accessor.getBuildProperties();
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 647b032..d6c81ab 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -22,14 +22,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Properties;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
@@ -99,11 +99,14 @@
for (TransactionLogDir txnLogDir : asterixConfiguration.getTransactionLogDir()) {
transactionLogDirs.put(txnLogDir.getNcId(), txnLogDir.getTxnLogDirPath());
}
- Properties p = new Properties();
+ Properties gitProperties = new Properties();
try {
- p.load(getClass().getClassLoader().getResourceAsStream("git.properties"));
- asterixBuildProperties = new HashMap<String, String>((Map)p);
- } catch(IOException e) {
+ gitProperties.load(getClass().getClassLoader().getResourceAsStream("git.properties"));
+ asterixBuildProperties = new HashMap<String, String>();
+ for (final String name : gitProperties.stringPropertyNames()) {
+ asterixBuildProperties.put(name, gitProperties.getProperty(name));
+ }
+ } catch (IOException e) {
throw new AsterixException(e);
}
@@ -137,7 +140,7 @@
return coredumpConfig;
}
- public Map<String, String> getBuildProperties(){
+ public Map<String, String> getBuildProperties() {
return asterixBuildProperties;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java
index 2d63c05..e464667 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java
@@ -29,6 +29,8 @@
* while at the same time the AsterixAppRuntimeContext depends on asterix-transactions for the TransactionSubsystem.
*/
public class TransactionSubsystemProvider implements ITransactionSubsystemProvider {
+ private static final long serialVersionUID = 1L;
+
@Override
public ITransactionSubsystem getTransactionSubsystem(IHyracksTaskContext ctx) {
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
index 333e58e..9865501 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
/**
* Represents the feed runtime that collects feed tuples from another feed.
@@ -40,8 +39,8 @@
private FeedFrameCollector frameCollector;
public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter,
- ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy) {
+ FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
+ Map<String, String> feedPolicy) {
super(runtimeId, inputSideHandler, outputSideWriter);
this.connectionId = connectionId;
this.sourceRuntime = sourceRuntime;
@@ -64,6 +63,7 @@
|| frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
}
+ @Override
public void setMode(Mode mode) {
getInputHandler().setMode(mode);
}
@@ -85,6 +85,7 @@
this.frameCollector = frameCollector;
}
+ @Override
public FeedFrameCollector getFrameCollector() {
return frameCollector;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
index 9dbca15..f5f89f7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
@@ -58,10 +58,11 @@
/** The value of the partition 'i' if this is the i'th instance of the associated operator **/
private final int partition;
- public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer, FeedRuntimeType feedRuntimeType,
- int partition, FrameTupleAccessor fta, IFeedManager feedManager) throws IOException {
+ public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
+ FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta, IFeedManager feedManager)
+ throws IOException {
this.feedId = feedId;
- this.frameDistributor = new FrameDistributor(ctx, feedId, feedRuntimeType, partition, true,
+ this.frameDistributor = new FrameDistributor(feedId, feedRuntimeType, partition, true,
feedManager.getFeedMemoryManager(), fta);
this.feedRuntimeType = feedRuntimeType;
this.partition = partition;
@@ -86,8 +87,8 @@
public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
if (!success) {
- throw new IllegalStateException("Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter
- + " not registered.");
+ throw new IllegalStateException(
+ "Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter + " not registered.");
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
index 1c9acf6..8609366 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
@@ -23,13 +23,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
public class FeedFrameDiscarder {
private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
- private final IHyracksTaskContext ctx;
private final FeedRuntimeInputHandler inputHandler;
private final FeedConnectionId connectionId;
private final FeedRuntimeId runtimeId;
@@ -37,9 +34,8 @@
private final float maxFractionDiscard;
private int nDiscarded;
- public FeedFrameDiscarder(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- FeedPolicyAccessor policyAccessor, FeedRuntimeInputHandler inputHandler) throws IOException {
- this.ctx = ctx;
+ public FeedFrameDiscarder(FeedConnectionId connectionId, FeedRuntimeId runtimeId, FeedPolicyAccessor policyAccessor,
+ FeedRuntimeInputHandler inputHandler) throws IOException {
this.connectionId = connectionId;
this.runtimeId = runtimeId;
this.policyAccessor = policyAccessor;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
index 520912b..c4a2ce0 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
@@ -128,7 +128,8 @@
private final Collection<FeedFrameCollector> frameCollectors;
- public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType, int partition) {
+ public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType,
+ int partition) {
this.frameCollectors = frameCollectors;
}
@@ -162,13 +163,11 @@
public static class DiskSpiller implements IFeedFrameHandler {
- private final FeedId feedId;
private FrameSpiller<ByteBuffer> receiver;
private Iterator<ByteBuffer> iterator;
public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
int frameSize) throws IOException {
- this.feedId = feedId;
receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
}
@@ -179,7 +178,6 @@
private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
- private final int frameSize;
private final FeedId feedId;
private BufferedOutputStream bos;
private final ByteBuffer reusableLengthBuffer;
@@ -191,7 +189,6 @@
public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
this.feedId = feedId;
- this.frameSize = frameSize;
this.frameDistributor = distributor;
reusableLengthBuffer = ByteBuffer.allocate(4);
reusableDataBuffer = ByteBuffer.allocate(frameSize);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
index 982fb7c..7b76692 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
@@ -32,14 +32,12 @@
private static final int UNKNOWN = -1;
- private final String nodeId;
private final AtomicInteger globalSenderId = new AtomicInteger(1);
private final Map<Integer, Sender> senders = new HashMap<Integer, Sender>();
private final Map<Integer, Series> statHistory = new HashMap<Integer, Series>();
private final Map<String, Sender> sendersByName = new HashMap<String, Sender>();
public FeedMetricCollector(String nodeId) {
- this.nodeId = nodeId;
}
@Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
index cafc699..6642df1 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
@@ -68,8 +68,8 @@
private FrameEventCallback frameEventCallback;
- public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId, IFrameWriter coreOperator,
- FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
+ public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws IOException {
this.connectionId = connectionId;
this.runtimeId = runtimeId;
@@ -77,17 +77,17 @@
this.bufferingEnabled = bufferingEnabled;
this.feedPolicyAccessor = fpa;
this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
- this.discarder = new FeedFrameDiscarder(ctx, connectionId, runtimeId, fpa, this);
+ this.discarder = new FeedFrameDiscarder(connectionId, runtimeId, fpa, this);
this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
this.mode = Mode.PROCESS;
this.lastMode = Mode.PROCESS;
this.finished = false;
this.fpa = fpa;
this.feedManager = feedManager;
- this.pool = (DataBucketPool) feedManager.getFeedMemoryManager().getMemoryComponent(
- IFeedMemoryComponent.Type.POOL);
- this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager().getMemoryComponent(
- IFeedMemoryComponent.Type.COLLECTION);
+ this.pool = (DataBucketPool) feedManager.getFeedMemoryManager()
+ .getMemoryComponent(IFeedMemoryComponent.Type.POOL);
+ this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
+ .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
@@ -96,6 +96,7 @@
this.throttlingEnabled = false;
}
+ @Override
public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
try {
switch (mode) {
@@ -159,8 +160,8 @@
LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
}
if (frameCollection == null) {
- this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager().getMemoryComponent(
- IFeedMemoryComponent.Type.COLLECTION);
+ this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
+ .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
}
if (frameCollection == null) {
discarder.processMessage(frame);
@@ -338,6 +339,7 @@
}
}
+ @Override
public void close() {
if (mBuffer != null) {
boolean disableMonitoring = !this.mode.equals(Mode.STALL);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedStorageStatistics.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedStorageStatistics.java
deleted file mode 100644
index 156dcc0..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedStorageStatistics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-public class FeedStorageStatistics {
-
- private long avgPersistenceDelay;
- private long lastIntakeTimestamp;
-
-
-}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
index f859bfc..9e106fb 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
@@ -29,7 +29,6 @@
import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -39,7 +38,6 @@
private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
- private final IHyracksTaskContext ctx;
private final FeedId feedId;
private final FeedRuntimeType feedRuntimeType;
private final int partition;
@@ -73,10 +71,9 @@
INACTIVE
}
- public FrameDistributor(IHyracksTaskContext ctx, FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
+ public FrameDistributor(FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
- throws HyracksDataException {
- this.ctx = ctx;
+ throws HyracksDataException {
this.feedId = feedId;
this.feedRuntimeType = feedRuntimeType;
this.partition = partition;
@@ -140,16 +137,16 @@
}
evaluateIfSpillIsEnabled();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id "
- + feedId);
+ LOGGER.info(
+ "Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id " + feedId);
}
}
public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
switch (distributionMode) {
case INACTIVE:
- throw new IllegalStateException("Invalid attempt to deregister frame collector in " + distributionMode
- + " mode.");
+ throw new IllegalStateException(
+ "Invalid attempt to deregister frame collector in " + distributionMode + " mode.");
case SHARED:
frameCollector.closeCollector();
registeredCollectors.remove(frameCollector.getFrameWriter());
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
index e85550d..5601f73 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
@@ -22,24 +22,20 @@
public class IntakePartitionStatistics {
- public static int ACK_WINDOW_SIZE = 1024;
- private int partition;
- private int base;
- private BitSet bitSet;
+ public static int ACK_WINDOW_SIZE = 1024;
+ private BitSet bitSet;
- public IntakePartitionStatistics(int partition, int base) {
- this.partition = partition;
- this.base = base;
- this.bitSet = new BitSet(ACK_WINDOW_SIZE);
- }
+ public IntakePartitionStatistics(int partition, int base) {
+ this.bitSet = new BitSet(ACK_WINDOW_SIZE);
+ }
- public void ackRecordId(int recordId) {
- int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
- this.bitSet.set(posIndexWithinBase);
- }
+ public void ackRecordId(int recordId) {
+ int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
+ this.bitSet.set(posIndexWithinBase);
+ }
- public byte[] getAckInfo() {
- return bitSet.toByteArray();
- }
+ public byte[] getAckInfo() {
+ return bitSet.toByteArray();
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
index 145fa47..5761944 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
@@ -57,7 +57,6 @@
protected final FeedRuntimeInputHandler inputHandler;
protected final IFrameEventCallback callback;
protected final Timer timer;
- private final RecordDescriptor recordDesc;
private final IExceptionHandler exceptionHandler;
protected final FeedPolicyAccessor policyAccessor;
protected int nPartitions;
@@ -122,7 +121,6 @@
this.callback = callback;
this.inputHandler = inputHandler;
this.timer = new Timer();
- this.recordDesc = recordDesc;
this.policyAccessor = policyAccessor;
this.nPartitions = nPartitions;
this.active = true;
@@ -156,9 +154,8 @@
this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
}
- if (monitorInputQueueLength
- && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
- || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
+ if (monitorInputQueueLength && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
+ || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
}
@@ -237,9 +234,8 @@
}
public void sendReport(ByteBuffer frame) {
- if ((logInflowOutflowRate || reportInflowRate)
- && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG) || inputHandler.getMode().equals(
- Mode.PROCESS_SPILL))) {
+ if ((logInflowOutflowRate || reportInflowRate) && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG)
+ || inputHandler.getMode().equals(Mode.PROCESS_SPILL))) {
inflowFta.reset(frame);
metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
index fb16d1d..6be0211 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
@@ -36,12 +36,10 @@
private static final float CPU_CHANGE_THRESHOLD = 0.2f;
private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
- private final String nodeId;
private final NodeLoadReportTask task;
private final Timer timer;
public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
- this.nodeId = nodeId;
this.task = new NodeLoadReportTask(nodeId, feedManager);
this.timer = new Timer();
}
@@ -58,7 +56,6 @@
private static class NodeLoadReportTask extends TimerTask {
- private final String nodeId;
private final IFeedManager feedManager;
private final NodeReportMessage message;
private final IFeedMessageService messageService;
@@ -67,7 +64,6 @@
private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
- this.nodeId = nodeId;
this.feedManager = feedManager;
this.message = new NodeReportMessage(0.0f, 0L, 0);
this.messageService = feedManager.getFeedMessageService();
@@ -90,8 +86,10 @@
return true;
}
- boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad()) / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
- boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap()) / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
+ boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad())
+ / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
+ boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap())
+ / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
index 474f0d1..d545b09 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
@@ -37,10 +37,11 @@
private boolean ackingEnabled;
private final boolean timeTrackingEnabled;
- public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
- FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
- FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
- IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
+ IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+ IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
+ FeedPolicyAccessor policyAccessor) {
super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
exceptionHandler, callback, nPartitions, policyAccessor);
timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
@@ -59,6 +60,7 @@
return false;
}
+ @Override
protected boolean logInflowOutflowRate() {
return true;
}
@@ -86,7 +88,6 @@
private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
- private static final long LOW_WINDOW_LIMIT = 1200 * 1000;
private long delayNormalWindow = 0;
private long delayHighWindow = 0;
@@ -104,8 +105,6 @@
int nTuples = frameAccessor.getTupleCount();
long intakeTimestamp;
long currentTime = System.currentTimeMillis();
- int partition = 0;
- int recordId = 0;
for (int i = 0; i < nTuples; i++) {
int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
int openPartOffsetOrig = frame.getInt(recordStart + 6);
@@ -113,11 +112,9 @@
int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+ (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
- recordId = frame.getInt(recordStart + recordIdOffset);
int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
+ 1;
- partition = frame.getInt(recordStart + partitionOffset);
int intakeTimestampValueOffset = partitionOffset + 4
+ (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
index 3bc6df9..7eb5921 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
@@ -21,7 +21,7 @@
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
public class SubscribableFeedRuntimeId extends FeedRuntimeId {
-
+ private static final long serialVersionUID = 1L;
private final FeedId feedId;
public SubscribableFeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java
index ebcf2fb..94af74b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java
@@ -19,7 +19,6 @@
package org.apache.asterix.common.feeds.api;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConnectJobInfo;
public interface IFeedLifecycleEventSubscriber {
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 2d03034..c92262c 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -29,18 +29,14 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
-import org.kohsuke.args4j.CmdLineParser;
-
-import org.apache.asterix.event.management.DefaultOutputHandler;
import org.apache.asterix.event.management.EventUtil;
-import org.apache.asterix.event.management.AsterixEventServiceClient;
-import org.apache.asterix.event.management.IOutputHandler;
import org.apache.asterix.event.management.Randomizer;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.event.schema.cluster.Property;
import org.apache.asterix.event.schema.event.Events;
import org.apache.asterix.event.schema.pattern.Patterns;
+import org.kohsuke.args4j.CmdLineParser;
public class EventDriver {
@@ -108,7 +104,6 @@
Randomizer.getInstance(eventConfig.seed);
}
Cluster cluster = initializeCluster(eventConfig.clusterPath);
- Patterns patterns = initializePatterns(eventConfig.patternPath);
initialize(eventConfig);
if (!eventConfig.dryRun) {
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/error/OutputHandler.java b/asterix-events/src/main/java/org/apache/asterix/event/error/OutputHandler.java
index 17e0f15..68e87c6 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/error/OutputHandler.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/error/OutputHandler.java
@@ -35,6 +35,7 @@
}
+ @Override
public OutputAnalysis reportEventOutput(Event event, String output) {
EventType eventType = EventType.valueOf(event.getType().toUpperCase());
@@ -83,6 +84,8 @@
ignore = false;
}
break;
+ default:
+ break;
}
if (ignore) {
return new OutputAnalysis(true, null);
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventExecutor.java
index 95557d7..4587756 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventExecutor.java
@@ -25,15 +25,13 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.io.IOUtils;
-
-import org.apache.asterix.common.config.AsterixStorageProperties;
import org.apache.asterix.event.driver.EventDriver;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.event.schema.cluster.Property;
import org.apache.asterix.event.schema.pattern.Pattern;
import org.apache.asterix.event.service.AsterixEventServiceUtil;
+import org.apache.commons.io.IOUtils;
public class EventExecutor {
@@ -70,8 +68,9 @@
if (node.getDebugPort() != null) {
int debugPort = node.getDebugPort().intValue();
if (!javaOpts.contains("-Xdebug")) {
- builder.append((" "
- + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=" + debugPort));
+ builder.append(
+ (" " + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address="
+ + debugPort));
}
}
builder.append("\"");
@@ -88,8 +87,9 @@
if (node.getDebugPort() != null) {
int debugPort = node.getDebugPort().intValue();
if (!javaOpts.contains("-Xdebug")) {
- builder.append((" "
- + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=" + debugPort));
+ builder.append(
+ (" " + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address="
+ + debugPort));
}
}
builder.append("\"");
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/model/BackupInfo.java b/asterix-events/src/main/java/org/apache/asterix/event/model/BackupInfo.java
index 6c5a25f..3169881 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/model/BackupInfo.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/model/BackupInfo.java
@@ -25,6 +25,7 @@
import org.apache.asterix.installer.schema.conf.Hdfs;
public class BackupInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
public static enum BackupType {
LOCAL,
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index bccfa43..a74ae4d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.external.dataset.adapter.RSSFeedAdapter;
@@ -44,8 +43,6 @@
private List<String> feedURLs = new ArrayList<String>();
private static Map<String, String> topicFeeds = new HashMap<String, String>();
private ARecordType recordType;
- private FeedPolicyAccessor policyAccessor;
-
public static final String KEY_RSS_URL = "topic";
public static final String KEY_INTERVAL = "interval";
public static final String TOP_STORIES = "topstories";
@@ -98,7 +95,7 @@
@Override
public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
- String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
+ String rssURLProperty = configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
@@ -113,8 +110,8 @@
for (String topic : rssTopics) {
String feedURL = topicFeeds.get(topic);
if (feedURL == null) {
- throw new IllegalArgumentException(" unknown topic :" + topic + " please choose from the following "
- + getValidTopics());
+ throw new IllegalArgumentException(
+ " unknown topic :" + topic + " please choose from the following " + getValidTopics());
}
feedURLs.add(feedURL);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index 91b15fe..c05df33 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -23,7 +23,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
@@ -47,7 +46,7 @@
public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
- private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
+ private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
private ARecordType outputType;
@@ -75,8 +74,8 @@
TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
if (configuration.get(SearchAPIConstants.QUERY) == null) {
- throw new AsterixException("parameter " + SearchAPIConstants.QUERY
- + " not specified as part of adaptor configuration");
+ throw new AsterixException(
+ "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
}
String interval = configuration.get(SearchAPIConstants.INTERVAL);
@@ -84,8 +83,8 @@
try {
Integer.parseInt(interval);
} catch (NumberFormatException nfe) {
- throw new IllegalArgumentException("parameter " + SearchAPIConstants.INTERVAL
- + " is defined incorrectly, expecting a number");
+ throw new IllegalArgumentException(
+ "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
}
} else {
configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
index 96e8393..df0926b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -23,12 +23,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
import org.apache.asterix.external.indexing.input.GenericFileAwareRecordReader;
import org.apache.asterix.external.indexing.input.GenericRecordReader;
import org.apache.asterix.external.indexing.input.TextualDataReader;
@@ -36,6 +30,10 @@
import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -76,10 +74,12 @@
*/
@Override
public InputStream getInputStream(int partition) throws IOException {
- if ((conf.getInputFormat() instanceof TextInputFormat || conf.getInputFormat() instanceof SequenceFileInputFormat)
- && (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase((String) configuration
- .get(AsterixTupleParserFactory.KEY_FORMAT)) || AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT
- .equalsIgnoreCase((String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT)))) {
+ if ((conf.getInputFormat() instanceof TextInputFormat
+ || conf.getInputFormat() instanceof SequenceFileInputFormat)
+ && (AsterixTupleParserFactory.FORMAT_ADM
+ .equalsIgnoreCase(configuration.get(AsterixTupleParserFactory.KEY_FORMAT))
+ || AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT
+ .equalsIgnoreCase(configuration.get(AsterixTupleParserFactory.KEY_FORMAT)))) {
if (files != null) {
return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
} else {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 11bf2a5..64c62f7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -25,7 +25,6 @@
import java.io.InputStream;
import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -61,7 +60,6 @@
}
}
-
@Override
public String getFilename(int partition) {
final FileSplit fileSplit = fileSplits[partition];
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
index 38e8a9e..bb40ac9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
@@ -18,22 +18,22 @@
*/
package org.apache.asterix.external.dataset.adapter;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.external.util.TweetProcessor;
import org.apache.asterix.external.util.TwitterUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+
import twitter4j.FilterQuery;
-import twitter4j.Query;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
-import java.util.concurrent.LinkedBlockingQueue;
-
/**
* An implementation of @see {PullBasedFeedClient} for the Twitter service. The
* feed client fetches data from Twitter service by sending request at regular
@@ -41,14 +41,12 @@
*/
public class PushBasedTwitterFeedClient extends FeedClient {
- private String keywords;
- private Query query;
-
private ARecordType recordType;
private TweetProcessor tweetProcessor;
private LinkedBlockingQueue<Status> inputQ;
- public PushBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PushBasedTwitterAdapter adapter) throws AsterixException {
+ public PushBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PushBasedTwitterAdapter adapter)
+ throws AsterixException {
this.recordType = recordType;
this.tweetProcessor = new TweetProcessor(recordType);
this.recordSerDe = new ARecordSerializerDeserializer(recordType);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
index edc79fe..136df05 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
@@ -20,16 +20,13 @@
import java.util.Map;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
-import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.operators.file.ADMDataParser;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.ITupleParser;
@@ -38,7 +35,6 @@
/**
* This is the parser factory for parsers used to do indexing
*/
-@SuppressWarnings("deprecation")
public class HDFSIndexingParserFactory implements ITupleParserFactory {
private static final long serialVersionUID = 1L;
@@ -59,8 +55,8 @@
// adapter arguments
private Map<String, String> arguments;
- public HDFSIndexingParserFactory(ARecordType atype, String inputFormat, String format, char delimiter,
- char quote, String parserClassName) {
+ public HDFSIndexingParserFactory(ARecordType atype, String inputFormat, String format, char delimiter, char quote,
+ String parserClassName) {
this.inputFormat = inputFormat;
this.format = format;
this.parserClassName = parserClassName;
@@ -95,8 +91,7 @@
return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
} else if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
// choice 3 with delimited data parser
- DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype,
- delimiter, quote);
+ DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype, delimiter, quote);
return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
index 7fc335c..c6013a9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
@@ -23,12 +23,8 @@
import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-
import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
-import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import org.apache.asterix.external.indexing.input.RCFileLookupReader;
import org.apache.asterix.external.indexing.input.SequenceFileLookupInputStream;
import org.apache.asterix.external.indexing.input.SequenceFileLookupReader;
@@ -41,6 +37,8 @@
import org.apache.asterix.runtime.operators.file.ADMDataParser;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
@@ -86,7 +84,8 @@
// Create the lookup reader and the controlled parser
if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
configureRCFile(jobConf, iNullWriterFactory);
- } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_ADM)) {
+ } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
+ .equals(AsterixTupleParserFactory.FORMAT_ADM)) {
// create an adm parser
ADMDataParser dataParser = new ADMDataParser();
if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
@@ -100,7 +99,8 @@
parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
}
- } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
+ } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
+ .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
// create a delimited text parser
char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
@@ -152,8 +152,8 @@
// Do nothing
}
- private void configureRCFile(Configuration jobConf, INullWriterFactory iNullWriterFactory) throws IOException,
- Exception {
+ private void configureRCFile(Configuration jobConf, INullWriterFactory iNullWriterFactory)
+ throws IOException, Exception {
// RCFileLookupReader
RCFileLookupReader reader = new RCFileLookupReader(fileIndexAccessor,
HDFSAdapterFactory.configureJobConf(configuration));
@@ -169,8 +169,8 @@
objectParser = new HiveObjectParser();
} else {
try {
- objectParser = (IAsterixHDFSRecordParser) Class.forName(
- configuration.get(HDFSAdapterFactory.KEY_PARSER)).newInstance();
+ objectParser = (IAsterixHDFSRecordParser) Class
+ .forName(configuration.get(HDFSAdapterFactory.KEY_PARSER)).newInstance();
} catch (Exception e) {
throw new HyracksDataException("Unable to create object parser", e);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java
index 371414c..2a51380 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java
@@ -33,7 +33,6 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -41,7 +40,6 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
-@SuppressWarnings("deprecation")
public class IndexingScheduler {
private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
@@ -59,7 +57,7 @@
/**
* The constructor of the scheduler.
- *
+ *
* @param ncNameToNcInfos
* @throws HyracksException
*/
@@ -77,7 +75,7 @@
* Set location constraints for a file scan operator with a list of file
* splits. It tries to assign splits to their local machines fairly
* Locality is more important than fairness
- *
+ *
* @throws HyracksDataException
*/
public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
@@ -125,17 +123,16 @@
* push non-data-local upper-bounds slots to each machine
*/
locationToNumOfAssignement.clear();
- for(String nc: NCs){
+ for (String nc : NCs) {
locationToNumOfAssignement.put(nc, 0);
}
- for(int i=0; i< scheduled.length;i++){
- if(scheduled[i])
- {
- locationToNumOfAssignement.put(locations[i], locationToNumOfAssignement.get(locations[i])+1);
+ for (int i = 0; i < scheduled.length; i++) {
+ if (scheduled[i]) {
+ locationToNumOfAssignement.put(locations[i], locationToNumOfAssignement.get(locations[i]) + 1);
}
}
-
- scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled,locationToNumOfAssignement);
+
+ scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled, locationToNumOfAssignement);
return locations;
} catch (IOException e) {
throw new HyracksException(e);
@@ -144,7 +141,7 @@
/**
* Schedule non-local slots to each machine
- *
+ *
* @param splits
* The HDFS file splits.
* @param workloads
@@ -155,11 +152,12 @@
* The maximum slots of each machine.
* @param scheduled
* Indicate which slot is scheduled.
- * @param locationToNumOfAssignement
+ * @param locationToNumOfAssignement
*/
private void scheduleNonLocalSlots(InputSplit[] splits, final int[] workloads, String[] locations, int slotLimit,
- boolean[] scheduled, final HashMap<String,Integer> locationToNumOfAssignement) throws IOException, UnknownHostException {
-
+ boolean[] scheduled, final HashMap<String, Integer> locationToNumOfAssignement)
+ throws IOException, UnknownHostException {
+
PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(NCs.length, new Comparator<String>() {
@Override
public int compare(String s1, String s2) {
@@ -168,8 +166,7 @@
});
-
- for(String nc:NCs){
+ for (String nc : NCs) {
scheduleCadndiates.add(nc);
}
/**
@@ -193,7 +190,7 @@
/**
* Schedule data-local slots to each machine.
- *
+ *
* @param splits
* The HDFS file splits.
* @param workloads
@@ -216,8 +213,8 @@
PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
@Override
public int compare(String s1, String s2) {
- int assignmentDifference = locationToNumOfAssignement.get(s1).compareTo(
- locationToNumOfAssignement.get(s2));
+ int assignmentDifference = locationToNumOfAssignement.get(s1)
+ .compareTo(locationToNumOfAssignement.get(s2));
if (assignmentDifference != 0) {
return assignmentDifference;
}
@@ -267,8 +264,8 @@
locations[i] = nc;
workloads[pos]++;
scheduled[i] = true;
- locationToNumOfAssignement
- .put(candidate, locationToNumOfAssignement.get(candidate) + 1);
+ locationToNumOfAssignement.put(candidate,
+ locationToNumOfAssignement.get(candidate) + 1);
break;
}
}
@@ -287,7 +284,7 @@
/**
* Scan the splits once and build a popularity map
- *
+ *
* @param splits
* the split array
* @param locationToNumOfSplits
@@ -311,7 +308,7 @@
/**
* Load the IP-address-to-NC map from the NCNameToNCInfoMap
- *
+ *
* @param ncNameToNcInfos
* @throws HyracksException
*/
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
index 7ca5b72..2bd2a95 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
@@ -22,19 +22,17 @@
import java.io.IOException;
import java.io.InputStream;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.metadata.entities.ExternalFile;
+import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-
/*
* This class is used for seek and read of external data of format adm or delimited text in sequence of text input format
*/
-@SuppressWarnings("deprecation")
public abstract class AbstractHDFSLookupInputStream extends InputStream {
protected String pendingValue = null;
@@ -45,7 +43,8 @@
protected ExternalFile file = new ExternalFile(null, null, 0, null, null, 0, ExternalFilePendingOp.PENDING_NO_OP);
protected ExternalFileIndexAccessor filesIndexAccessor;
- public AbstractHDFSLookupInputStream(ExternalFileIndexAccessor filesIndexAccessor, JobConf conf) throws IOException {
+ public AbstractHDFSLookupInputStream(ExternalFileIndexAccessor filesIndexAccessor, JobConf conf)
+ throws IOException {
this.filesIndexAccessor = filesIndexAccessor;
fs = FileSystem.get(conf);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java
index 577e9b2..65bfcf3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java
@@ -27,12 +27,11 @@
/***
* an abstract class to be used for reading hdfs based datasets one record at a time <- used for indexing->
*/
-@SuppressWarnings("deprecation")
public abstract class AbstractHDFSReader extends InputStream {
/***
* This function should be called once to do initial setup before starting to read records
- *
+ *
* @return true if ready for reading
*/
abstract public boolean initialize() throws Exception;
@@ -96,6 +95,7 @@
public void progress() {
}
+ @Override
public float getProgress() {
return 0.0f;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
index 04e43eb..6dbf464 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
+import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,17 +29,15 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.asterix.metadata.entities.ExternalFile;
/**
* This is a generic reader used for indexing external dataset or for performing full scan for external dataset with
* a stored snapshot
- * @author alamouda
*
+ * @author alamouda
*/
-@SuppressWarnings("deprecation")
-public class GenericFileAwareRecordReader extends GenericRecordReader{
-
+public class GenericFileAwareRecordReader extends GenericRecordReader {
+
private List<ExternalFile> files;
private FileSystem hadoopFS;
private long recordOffset = 0L;
@@ -71,16 +70,15 @@
/**
* read the split
*/
- try{
- String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath()
- .toUri().getPath();
+ try {
+ String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
FileStatus fileStatus = hadoopFS.getFileStatus(new Path(fileName));
//skip if not the same file stored in the files snapshot
- if(fileStatus.getModificationTime() != files.get(currentSplitIndex).getLastModefiedTime().getTime())
+ if (fileStatus.getModificationTime() != files.get(currentSplitIndex).getLastModefiedTime()
+ .getTime())
continue;
reader = getRecordReader(currentSplitIndex);
- }
- catch(Exception e){
+ } catch (Exception e) {
continue;
}
key = reader.createKey();
@@ -90,7 +88,7 @@
}
return false;
}
-
+
@SuppressWarnings("unchecked")
@Override
public Object readNext() throws IOException {
@@ -125,5 +123,5 @@
public int getFileNumber() throws Exception {
return files.get(currentSplitIndex).getFileNumber();
}
-
+
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java
index 681e4bc..ab050a7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.indexing.input;
import java.io.IOException;
+
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -28,7 +29,7 @@
* This class can be used by any input format to perform full scan operations
*/
-@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class GenericRecordReader extends AbstractHDFSReader {
protected RecordReader reader;
@@ -86,8 +87,7 @@
}
protected RecordReader getRecordReader(int slitIndex) throws IOException {
- RecordReader reader = conf.getInputFormat().getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[slitIndex], conf, getReporter());
return reader;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
index d44359b..4b89f59 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
+import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,15 +29,13 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
-
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
//Used in two cases:
//1. building an index over a dataset
//2. performing full scan over a dataset that has built index (to provide consistent view) with RCFile format
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings("rawtypes")
public class RCFileDataReader extends AbstractHDFSReader {
private RecordReader reader;
@@ -121,10 +120,9 @@
private RecordReader getRecordReader(int slitIndex) throws IOException {
RecordReader reader;
- try{
- reader = conf.getInputFormat().getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
- } catch(Exception e){
+ try {
+ reader = conf.getInputFormat().getRecordReader(inputSplits[slitIndex], conf, getReporter());
+ } catch (Exception e) {
e.printStackTrace();
throw e;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
index 3bf024a..797b961 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
+import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,15 +32,13 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
-
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
// Used in two cases:
// 1. building an index over a dataset
// 2. performing full scan over a dataset that has built index (to provide consistent view)
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings("rawtypes")
public class TextualDataReader extends AbstractHDFSReader {
private RecordReader<Object, Text> reader;
@@ -151,24 +150,21 @@
continue;
}
key = reader.createKey();
- value = (Text) reader.createValue();
+ value = reader.createValue();
return true;
}
}
return false;
}
-
private RecordReader getRecordReader(int splitIndex) throws IOException {
RecordReader reader;
if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex], conf,
- getReporter());
+ reader = format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
} else {
TextInputFormat format = (TextInputFormat) conf.getInputFormat();
- reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex], conf,
- getReporter());
+ reader = format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
}
return reader;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index 9808f8f..e769ad1 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -63,7 +63,7 @@
String dataverse = finfo.getFunctionIdentifier().getNamespace();
ClassLoader libraryClassLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, functionLibary);
String classname = finfo.getFunctionBody().trim();
- Class clazz;
+ Class<?> clazz;
try {
clazz = Class.forName(classname, true, libraryClassLoader);
externalFunctionFactory = (IFunctionFactory) clazz.newInstance();
@@ -88,8 +88,8 @@
// Type-cast the source array based on the input type that this function wants to receive.
ATypeTag targetTypeTag = finfo.getParamList().get(i).getTypeTag();
- ATypeTag sourceTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[inputVal
- .getStartOffset()]);
+ ATypeTag sourceTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[inputVal.getStartOffset()]);
if (sourceTypeTag != targetTypeTag) {
castBuffer.reset();
ATypeHierarchy.convertNumericTypeByteArray(inputVal.getByteArray(), inputVal.getStartOffset(),
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
index b3a1130..be9f9c1 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
@@ -46,10 +46,9 @@
}
class ExternalScalarFunctionDescriptor extends AbstractScalarFunctionDynamicDescriptor implements IFunctionDescriptor {
-
+ private static final long serialVersionUID = 1L;
private final IFunctionInfo finfo;
private ICopyEvaluatorFactory evaluatorFactory;
- private ICopyEvaluatorFactory[] args;
@Override
public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) throws AlgebricksException {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java
index 3c5c20f..d0d44e3 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java
@@ -44,7 +44,6 @@
}
class ExternalScalarFunction extends ExternalFunction implements IExternalScalarFunction, ICopyEvaluator {
- private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
public ExternalScalarFunction(IExternalFunctionInfo finfo, ICopyEvaluatorFactory args[],
@@ -69,6 +68,7 @@
}
}
+ @Override
public void evaluate(IFunctionHelper argumentProvider) throws Exception {
((IExternalScalarFunction) externalFunction).evaluate(argumentProvider);
/*
@@ -82,5 +82,4 @@
}
}
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/JTypeObjectFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/JTypeObjectFactory.java
index 649cf81..677ed76 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/JTypeObjectFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/JTypeObjectFactory.java
@@ -144,6 +144,8 @@
}
}
return retValue = itemObject;
+ default:
+ break;
}
return retValue;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
index 39c5116..2671f13 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
@@ -19,7 +19,6 @@
package org.apache.asterix.external.library;
import java.io.DataOutput;
-import java.nio.ByteBuffer;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -42,7 +41,6 @@
public class ResultCollector implements IResultCollector {
private IAObject reusableResultObjectHolder;
- private ByteBuffer reusableResultBinaryHolder;
private IDataOutputProvider outputProvider;
private IExternalFunctionInfo finfo;
@@ -50,7 +48,6 @@
this.finfo = finfo;
IAType returnType = finfo.getReturnType();
reusableResultObjectHolder = allocateResultObjectHolder(returnType);
- reusableResultBinaryHolder = allocateResultBinaryHolder(returnType);
this.outputProvider = outputProvider;
}
@@ -73,24 +70,8 @@
fieldObjects[i] = allocateResultObjectHolder(fieldType[i]);
}
return new AMutableRecord((ARecordType) type, fieldObjects);
- }
- return null;
- }
-
- private ByteBuffer allocateResultBinaryHolder(IAType type) {
- switch (type.getTypeTag()) {
- case INT32:
- return ByteBuffer.allocate(4);
- case FLOAT:
- return ByteBuffer.allocate(4);
- case DOUBLE:
- return ByteBuffer.allocate(8);
- case STRING:
- return ByteBuffer.allocate(32 * 1024);
- case ORDEREDLIST:
- return ByteBuffer.allocate(32 * 1024);
- case RECORD:
- return ByteBuffer.allocate(32 * 1024);
+ default:
+ break;
}
return null;
}
@@ -130,14 +111,15 @@
serializeResult(list);
}
+ @Override
public IAObject getComplexTypeResultHolder() {
return reusableResultObjectHolder;
}
private void serializeResult(IAObject object) throws AsterixException {
try {
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(finfo.getReturnType()).serialize(
- reusableResultObjectHolder, outputProvider.getDataOutput());
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(finfo.getReturnType())
+ .serialize(reusableResultObjectHolder, outputProvider.getDataOutput());
} catch (HyracksDataException hde) {
throw new AsterixException(hde);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 6010e54..677e913 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -18,6 +18,11 @@
*/
package org.apache.asterix.external.library.java;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.LinkedHashMap;
+import java.util.List;
+
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
@@ -81,13 +86,6 @@
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
public class JObjectAccessors {
@@ -136,6 +134,8 @@
case DURATION:
accessor = new JDurationAccessor();
break;
+ default:
+ break;
}
return accessor;
}
@@ -236,8 +236,8 @@
int l = pointable.getLength();
String v = null;
- v = aStringSerDer.deserialize(
- new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1))).getStringValue();
+ v = aStringSerDer.deserialize(new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1)))
+ .getStringValue();
JObjectUtil.getNormalizedString(v);
IJObject jObject = objectPool.allocate(BuiltinType.ASTRING);
@@ -296,8 +296,8 @@
byte[] b = pointable.getByteArray();
int s = pointable.getStartOffset();
int l = pointable.getLength();
- ADuration duration = ADurationSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(b, s, l)));
+ ADuration duration = ADurationSerializerDeserializer.INSTANCE
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s, l)));
IJObject jObject = objectPool.allocate(BuiltinType.ADURATION);
((JDuration) jObject).setValue(duration.getMonths(), duration.getMilliseconds());
return jObject;
@@ -348,8 +348,8 @@
byte[] b = pointable.getByteArray();
int s = pointable.getStartOffset();
int l = pointable.getLength();
- ACircle v = ACircleSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(b, s, l)));
+ ACircle v = ACircleSerializerDeserializer.INSTANCE
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s, l)));
JPoint jpoint = (JPoint) objectPool.allocate(BuiltinType.APOINT);
jpoint.setValue(v.getP().getX(), v.getP().getY());
IJObject jObject = objectPool.allocate(BuiltinType.ACIRCLE);
@@ -366,8 +366,8 @@
byte[] b = pointable.getByteArray();
int s = pointable.getStartOffset();
int l = pointable.getLength();
- APoint v = APointSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(new ByteArrayInputStream(
- b, s, l)));
+ APoint v = APointSerializerDeserializer.INSTANCE
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s, l)));
JPoint jObject = (JPoint) objectPool.allocate(BuiltinType.APOINT);
jObject.setValue(v.getX(), v.getY());
return jObject;
@@ -382,8 +382,8 @@
byte[] b = pointable.getByteArray();
int s = pointable.getStartOffset();
int l = pointable.getLength();
- APoint3D v = APoint3DSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(b, s, l)));
+ APoint3D v = APoint3DSerializerDeserializer.INSTANCE
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s, l)));
JPoint3D jObject = (JPoint3D) objectPool.allocate(BuiltinType.APOINT3D);
jObject.setValue(v.getX(), v.getY(), v.getZ());
return jObject;
@@ -398,8 +398,8 @@
byte[] b = pointable.getByteArray();
int s = pointable.getStartOffset();
int l = pointable.getLength();
- ALine v = ALineSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(new ByteArrayInputStream(b,
- s, l)));
+ ALine v = ALineSerializerDeserializer.INSTANCE
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s, l)));
JLine jObject = (JLine) objectPool.allocate(BuiltinType.ALINE);
jObject.setValue(v.getP1(), v.getP2());
return jObject;
@@ -414,8 +414,8 @@
byte[] b = pointable.getByteArray();
int s = pointable.getStartOffset();
int l = pointable.getLength();
- APolygon v = APolygonSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(b, s, l)));
+ APolygon v = APolygonSerializerDeserializer.INSTANCE
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s, l)));
JPolygon jObject = (JPolygon) objectPool.allocate(BuiltinType.APOLYGON);
jObject.setValue(v.getPoints());
return jObject;
@@ -430,8 +430,8 @@
byte[] b = pointable.getByteArray();
int s = pointable.getStartOffset();
int l = pointable.getLength();
- ARectangle v = ARectangleSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(b, s, l)));
+ ARectangle v = ARectangleSerializerDeserializer.INSTANCE
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s, l)));
JRectangle jObject = (JRectangle) objectPool.allocate(BuiltinType.ARECTANGLE);
jObject.setValue(v.getP1(), v.getP2());
return jObject;
@@ -461,7 +461,7 @@
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
- ARecordVisitablePointable recordPointable = (ARecordVisitablePointable) pointable;
+ ARecordVisitablePointable recordPointable = pointable;
List<IVisitablePointable> fieldPointables = recordPointable.getFieldValues();
List<IVisitablePointable> fieldTypeTags = recordPointable.getFieldTypeTags();
List<IVisitablePointable> fieldNames = recordPointable.getFieldNames();
@@ -473,8 +473,8 @@
closedPart = index < recordType.getFieldTypes().length;
IVisitablePointable tt = fieldTypeTags.get(index);
IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null;
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt
- .getStartOffset()]);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(tt.getByteArray()[tt.getStartOffset()]);
IVisitablePointable fieldName = fieldNames.get(index);
typeInfo.reset(fieldType, typeTag);
switch (typeTag) {
@@ -487,8 +487,8 @@
// value is null
fieldObject = null;
} else {
- fieldObject = pointableVisitor
- .visit((AListVisitablePointable) fieldPointable, typeInfo);
+ fieldObject = pointableVisitor.visit((AListVisitablePointable) fieldPointable,
+ typeInfo);
}
break;
case ANY:
@@ -502,8 +502,9 @@
byte[] b = fieldName.getByteArray();
int s = fieldName.getStartOffset();
int l = fieldName.getLength();
- String v = aStringSerDer.deserialize(
- new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1))).getStringValue();
+ String v = aStringSerDer
+ .deserialize(new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1)))
+ .getStringValue();
openFields.put(v, fieldObject);
}
index++;
@@ -538,8 +539,7 @@
@Override
public IJObject access(AListVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool,
- IAType listType,
- JObjectPointableVisitor pointableVisitor) throws HyracksDataException {
+ IAType listType, JObjectPointableVisitor pointableVisitor) throws HyracksDataException {
List<IVisitablePointable> items = pointable.getItems();
List<IVisitablePointable> itemTags = pointable.getItemTags();
JList list = pointable.ordered() ? new JOrderedList(listType) : new JUnorderedList(listType);
@@ -549,8 +549,8 @@
for (IVisitablePointable itemPointable : items) {
IVisitablePointable itemTagPointable = itemTags.get(index);
- ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(itemTagPointable
- .getByteArray()[itemTagPointable.getStartOffset()]);
+ ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]);
typeInfo.reset(listType.getType(), listType.getTypeTag());
switch (itemTypeTag) {
case RECORD:
@@ -561,17 +561,14 @@
listItem = pointableVisitor.visit((AListVisitablePointable) itemPointable, typeInfo);
break;
case ANY:
- throw new IllegalArgumentException("Cannot parse list item of type "
- + listType.getTypeTag());
+ throw new IllegalArgumentException(
+ "Cannot parse list item of type " + listType.getTypeTag());
default:
IAType itemType = ((AbstractCollectionType) listType).getItemType();
typeInfo.reset(itemType, itemType.getTypeTag());
listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
}
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(itemPointable.getByteArray()[itemPointable.getStartOffset()]);
-
list.add(listItem);
}
} catch (AsterixException exception) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
index a0710ff..b5458e2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.external.library.java;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -56,20 +60,16 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class JObjectUtil {
/**
- * Normalize an input string by removing linebreaks, and replace them with space
- * Also remove non-readable special characters
+ * Normalize an input string by removing linebreaks, and replace them with space
+ * Also remove non-readable special characters
*
* @param originalString
- * The input String
+ * The input String
* @return
- * String - the normalized string
+ * String - the normalized string
*/
public static String getNormalizedString(String originalString) {
int len = originalString.length();
@@ -221,7 +221,7 @@
p1.setValue(dis.readDouble(), dis.readDouble());
points.add(p1);
}
- ((JPolygon) jObject).setValue(points.toArray(new APoint[]{}));
+ ((JPolygon) jObject).setValue(points.toArray(new APoint[] {}));
break;
}
@@ -267,7 +267,7 @@
dis.readInt();
}
for (int i = 0; i < numberOfitems; i++) {
- IJObject v = (IJObject) getJType(elementType.getTypeTag(), elementType, dis, objectPool);
+ IJObject v = getJType(elementType.getTypeTag(), elementType, dis, objectPool);
((JUnorderedList) jObject).add(v);
}
}
@@ -302,7 +302,7 @@
dis.readInt();
}
for (int i = 0; i < numberOfitems; i++) {
- IJObject v = (IJObject) getJType(elementType.getTypeTag(), elementType, dis, objectPool);
+ IJObject v = getJType(elementType.getTypeTag(), elementType, dis, objectPool);
((JOrderedList) jObject).add(v);
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjects.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjects.java
index 02f7b4b..406d242 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjects.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjects.java
@@ -18,6 +18,18 @@
*/
package org.apache.asterix.external.library.java;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.asterix.builders.IAsterixListBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.builders.UnorderedListBuilder;
@@ -44,7 +56,6 @@
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ADouble;
import org.apache.asterix.om.base.AFloat;
-import org.apache.asterix.om.base.AInt16;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AInt8;
@@ -83,18 +94,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
public class JObjects {
public static abstract class JObject implements IJObject {
@@ -200,11 +199,11 @@
}
public void setValue(byte v) {
- ((AMutableInt16) value).setValue(v);
+ value.setValue(v);
}
public short getValue() {
- return ((AMutableInt16) value).getShortValue();
+ return value.getShortValue();
}
@Override
@@ -216,12 +215,12 @@
throw new HyracksDataException(e);
}
}
- AInt16SerializerDeserializer.INSTANCE.serialize((AInt16) value, dataOutput);
+ AInt16SerializerDeserializer.INSTANCE.serialize(value, dataOutput);
}
@Override
public void reset() {
- ((AMutableInt16) value).setValue((short) 0);
+ value.setValue((short) 0);
}
}
@@ -930,6 +929,7 @@
this.jObjects = new ArrayList<IJObject>();
}
+ @Override
public void add(IJObject jObject) {
jObjects.add(jObject);
}
@@ -980,7 +980,6 @@
private Map<String, IJObject> openFields;
private final AStringSerializerDeserializer aStringSerDer = new AStringSerializerDeserializer();
-
public JRecord(ARecordType recordType, IJObject[] fields) {
this.recordType = recordType;
this.fields = fields;
@@ -993,21 +992,6 @@
this.openFields = openFields;
}
- private ARecordType getARecordType(String[] fieldNames, IJObject[] fields) throws AsterixException {
- IAType[] fieldTypes = new IAType[fields.length];
- int index = 0;
- for (IJObject jObj : fields) {
- fieldTypes[index++] = jObj.getIAObject().getType();
- }
- ARecordType recordType;
- try {
- recordType = new ARecordType(null, fieldNames, fieldTypes, false);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
- return recordType;
- }
-
public void addField(String fieldName, IJObject fieldValue) throws AsterixException {
int pos = getFieldPosByName(fieldName);
if (pos >= 0) {
@@ -1086,6 +1070,7 @@
return recordBuilder;
}
+ @Override
public void serialize(DataOutput output, boolean writeTypeTag) throws HyracksDataException {
RecordBuilder recordBuilder = new RecordBuilder();
recordBuilder.reset(recordType);
@@ -1128,6 +1113,7 @@
return value;
}
+ @Override
public void reset() throws AlgebricksException {
if (openFields != null && !openFields.isEmpty()) {
openFields.clear();
diff --git a/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinAppendLength.java b/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinAppendLength.java
index 138aee2..6778152 100644
--- a/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinAppendLength.java
+++ b/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinAppendLength.java
@@ -25,7 +25,6 @@
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import org.apache.asterix.fuzzyjoin.tokenizer.Tokenizer;
import org.apache.asterix.fuzzyjoin.tokenizer.TokenizerFactory;
@@ -44,11 +43,10 @@
int[] dataColumns = FuzzyJoinUtil.getDataColumns("2,3");
String line;
- HashMap<String, MutableInteger> tokenCount = new HashMap<String, MutableInteger>();
while ((line = input.readLine()) != null) {
String[] splits = line.split(FuzzyJoinConfig.RECORD_SEPARATOR_REGEX);
- Collection<String> tokens = tokenizer.tokenize(FuzzyJoinUtil.getData(splits, dataColumns,
- FuzzyJoinConfig.TOKEN_SEPARATOR));
+ Collection<String> tokens = tokenizer
+ .tokenize(FuzzyJoinUtil.getData(splits, dataColumns, FuzzyJoinConfig.TOKEN_SEPARATOR));
output.write(splits[0] + FuzzyJoinConfig.RECORD_SEPARATOR + splits[1] + FuzzyJoinConfig.RECORD_SEPARATOR
+ splits[2] + FuzzyJoinConfig.RECORD_SEPARATOR + splits[3] + FuzzyJoinConfig.RECORD_SEPARATOR
+ tokens.size() + "\n");
diff --git a/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinTokenize.java b/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinTokenize.java
index f56d01e..62b34c0 100644
--- a/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinTokenize.java
+++ b/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/FuzzyJoinTokenize.java
@@ -39,7 +39,7 @@
import org.apache.asterix.fuzzyjoin.tokenorder.TokenRankFrequency;
public class FuzzyJoinTokenize {
- public static class TokenCount implements Comparable {
+ public static class TokenCount implements Comparable<Object> {
public String token;
public MutableInteger count;
@@ -111,14 +111,14 @@
tokenLoad.loadTokenRank();
input = new BufferedReader(new FileReader(inputFileName));
- LittleEndianIntOutputStream outputTokenized = new LittleEndianIntOutputStream(new BufferedOutputStream(
- new FileOutputStream(tokenizedFileName)));
+ LittleEndianIntOutputStream outputTokenized = new LittleEndianIntOutputStream(
+ new BufferedOutputStream(new FileOutputStream(tokenizedFileName)));
while ((line = input.readLine()) != null) {
String splits[] = line.split(FuzzyJoinConfig.RECORD_SEPARATOR_REGEX);
int rid = Integer.parseInt(splits[FuzzyJoinConfig.RECORD_KEY]);
outputTokenized.writeInt(rid);
- Collection<String> tokens = tokenizer.tokenize(FuzzyJoinUtil.getData(splits, dataColumns,
- FuzzyJoinConfig.TOKEN_SEPARATOR));
+ Collection<String> tokens = tokenizer
+ .tokenize(FuzzyJoinUtil.getData(splits, dataColumns, FuzzyJoinConfig.TOKEN_SEPARATOR));
Collection<Integer> tokensRanked = tokenRank.getTokenRanks(tokens);
outputTokenized.writeInt(tokensRanked.size());
for (Integer token : tokensRanked) {
diff --git a/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/MutableInteger.java b/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/MutableInteger.java
index 5083d3a..f7d7d35 100644
--- a/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/MutableInteger.java
+++ b/asterix-fuzzyjoin/src/main/java/org/apache/asterix/fuzzyjoin/MutableInteger.java
@@ -19,7 +19,7 @@
package org.apache.asterix.fuzzyjoin;
-public class MutableInteger implements Comparable {
+public class MutableInteger implements Comparable<Object> {
private int v;
public MutableInteger(int v) {
diff --git a/asterix-fuzzyjoin/src/test/java/org/apache/asterix/fuzzyjoin/tests/FuzzyJoinTestUtil.java b/asterix-fuzzyjoin/src/test/java/org/apache/asterix/fuzzyjoin/tests/FuzzyJoinTestUtil.java
index f4d1aebd..703db60 100644
--- a/asterix-fuzzyjoin/src/test/java/org/apache/asterix/fuzzyjoin/tests/FuzzyJoinTestUtil.java
+++ b/asterix-fuzzyjoin/src/test/java/org/apache/asterix/fuzzyjoin/tests/FuzzyJoinTestUtil.java
@@ -29,15 +29,12 @@
public class FuzzyJoinTestUtil {
- public static void verifyDirectory(String pathTest, String pathCorrect)
- throws IOException {
+ public static void verifyDirectory(String pathTest, String pathCorrect) throws IOException {
verifyDirectory(pathTest, pathCorrect, false);
}
- public static void verifyDirectory(String pathTest, String pathCorrect,
- boolean noDup) throws IOException {
- int countTest = 0, countTestDedup = 0, countCorrect = 0;
-
+ public static void verifyDirectory(String pathTest, String pathCorrect, boolean noDup) throws IOException {
+ int countTestDedup = 0, countCorrect = 0;
BufferedReader input;
String line;
HashSet<String> buffer = new HashSet<String>();
@@ -46,7 +43,6 @@
input = new BufferedReader(new FileReader(pathTest));
while ((line = input.readLine()) != null) {
buffer.add(line);
- countTest++;
}
countTestDedup = buffer.size();
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/error/OutputHandler.java b/asterix-installer/src/main/java/org/apache/asterix/installer/error/OutputHandler.java
index 1da9ee4..4c83706 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/error/OutputHandler.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/error/OutputHandler.java
@@ -35,6 +35,7 @@
}
+ @Override
public OutputAnalysis reportEventOutput(Event event, String output) {
EventType eventType = EventType.valueOf(event.getType().toUpperCase());
@@ -83,6 +84,8 @@
ignore = false;
}
break;
+ default:
+ break;
}
if (ignore) {
return new OutputAnalysis(true, null);
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
index 52ee4b4..1dd69df 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.Map;
-import java.util.logging.Logger;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
@@ -53,11 +52,8 @@
private static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
private static final int zookeeperClientPort = 2900;
private static final int zookeeperTestClientPort = 3945;
-
private static IHyracksClientConnection hcc;
- private static final Logger LOGGER = Logger.getLogger(AsterixInstallerIntegrationUtil.class.getName());
-
public static void deinit() throws Exception {
deleteInstance();
stopZookeeper();
@@ -76,12 +72,12 @@
})[0];
managixHome = new File(installerTargetDir, managixHomeDirName).getAbsolutePath();
- System.setProperty("log4j.configuration", managixHome + File.separator + "conf" + File.separator
- + "log4j.properties");
+ System.setProperty("log4j.configuration",
+ managixHome + File.separator + "conf" + File.separator + "log4j.properties");
managixHome = AsterixInstallerIntegrationUtil.getManagixHome();
- clusterConfigurationPath = managixHome + File.separator + "clusters" + File.separator + "local"
- + File.separator + "local.xml";
+ clusterConfigurationPath = managixHome + File.separator + "clusters" + File.separator + "local" + File.separator
+ + "local.xml";
InstallerDriver.setManagixHome(managixHome);
@@ -206,7 +202,8 @@
return managixHome;
}
- public static void installLibrary(String libraryName, String libraryDataverse, String libraryPath) throws Exception {
+ public static void installLibrary(String libraryName, String libraryDataverse, String libraryPath)
+ throws Exception {
transformIntoRequiredState(State.INACTIVE);
String command = "install -n " + ASTERIX_INSTANCE_NAME + " -d " + libraryDataverse + " -l " + libraryName
+ " -p " + libraryPath;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index d3d481c..df48269 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -62,7 +62,7 @@
/**
* Create all metadata record types.
- *
+ *
* @throws HyracksDataException
*/
public static void init() throws MetadataException, HyracksDataException {
@@ -150,9 +150,10 @@
private static final ARecordType createDataverseRecordType() throws AsterixException {
try {
- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp",
- "PendingOp" }, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- BuiltinType.AINT32 }, true);
+ return new ARecordType("DataverseRecordType",
+ new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 },
+ true);
} catch (HyracksDataException e) {
throw new AsterixException(e);
}
@@ -203,12 +204,10 @@
public static final int EXTERNAL_DETAILS_ARECORD_TRANSACTION_STATE_FIELD_INDEX = 3;
private static final ARecordType createExternalDetailsRecordType() throws AsterixException {
-
AOrderedListType orderedPropertyListType = new AOrderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null);
- AOrderedListType compactionPolicyPropertyListType = new AOrderedListType(
- COMPACTION_POLICY_PROPERTIES_RECORDTYPE, null);
String[] fieldNames = { "DatasourceAdapter", "Properties", "LastRefreshTime", "TransactionState", };
- IAType[] fieldTypes = { BuiltinType.ASTRING, orderedPropertyListType, BuiltinType.ADATETIME, BuiltinType.AINT32 };
+ IAType[] fieldTypes = { BuiltinType.ASTRING, orderedPropertyListType, BuiltinType.ADATETIME,
+ BuiltinType.AINT32 };
try {
return new ARecordType(null, fieldNames, fieldTypes, true);
} catch (HyracksDataException e) {
@@ -388,8 +387,8 @@
private static final ARecordType createIndexRecordType() throws AsterixException {
AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
AOrderedListType ololType = new AOrderedListType(olType, null);
- String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
- "IsPrimary", "Timestamp", "PendingOp" };
+ String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey", "IsPrimary",
+ "Timestamp", "PendingOp" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
ololType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
try {
@@ -518,7 +517,8 @@
public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_NAME_FIELD_INDEX = 0;
public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 1;
- private static final ARecordType createPrimaryFeedDetailsRecordType() throws AsterixException, HyracksDataException {
+ private static final ARecordType createPrimaryFeedDetailsRecordType()
+ throws AsterixException, HyracksDataException {
AUnorderedListType unorderedAdaptorPropertyListType = new AUnorderedListType(
DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null);
@@ -529,8 +529,8 @@
public static final int FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX = 0;
- private static final ARecordType createSecondaryFeedDetailsRecordType() throws AsterixException,
- HyracksDataException {
+ private static final ARecordType createSecondaryFeedDetailsRecordType()
+ throws AsterixException, HyracksDataException {
String[] fieldNames = { "SourceFeedName" };
IAType[] fieldTypes = { BuiltinType.ASTRING };
return new ARecordType(null, fieldNames, fieldTypes, true);
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
index 946c950..cf7a95c 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
@@ -24,7 +24,7 @@
import org.apache.asterix.metadata.api.IMetadataEntity;
public class Function implements IMetadataEntity {
-
+ private static final long serialVersionUID = 1L;
public static final String LANGUAGE_AQL = "AQL";
public static final String LANGUAGE_JAVA = "JAVA";
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
index 10895f2..e09928b 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
@@ -37,7 +37,6 @@
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AMutableInt32;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
@@ -66,13 +65,11 @@
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(MetadataRecordTypes.FEED_POLICY_RECORDTYPE);
- private AMutableInt32 aInt32;
protected ISerializerDeserializer<AInt32> aInt32Serde;
@SuppressWarnings("unchecked")
public FeedPolicyTupleTranslator(boolean getTuple) {
super(getTuple, MetadataPrimaryIndexes.FEED_POLICY_DATASET.getFieldCount());
- aInt32 = new AMutableInt32(-1);
aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
}
@@ -83,7 +80,7 @@
int recordLength = frameTuple.getFieldLength(FEED_POLICY_PAYLOAD_TUPLE_FIELD_INDEX);
ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
DataInput in = new DataInputStream(stream);
- ARecord feedPolicyRecord = (ARecord) recordSerDes.deserialize(in);
+ ARecord feedPolicyRecord = recordSerDes.deserialize(in);
return createFeedPolicyFromARecord(feedPolicyRecord);
}
@@ -150,8 +147,8 @@
// write field 3 (properties)
Map<String, String> properties = feedPolicy.getProperties();
UnorderedListBuilder listBuilder = new UnorderedListBuilder();
- listBuilder
- .reset((AUnorderedListType) MetadataRecordTypes.FEED_POLICY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX]);
+ listBuilder.reset((AUnorderedListType) MetadataRecordTypes.FEED_POLICY_RECORDTYPE
+ .getFieldTypes()[MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX]);
for (Map.Entry<String, String> property : properties.entrySet()) {
String name = property.getKey();
String value = property.getValue();
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 425c299..ba5aac7 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -59,7 +59,6 @@
private final IFeedManager feedManager;
private final IHyracksTaskContext ctx;
private final IFeedAdapterFactory adapterFactory;
- private final FeedPolicyAccessor policyAccessor;
private IngestionRuntime ingestionRuntime;
private IFeedAdapter adapter;
@@ -77,7 +76,6 @@
.getApplicationContext().getApplicationObject();
this.feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
this.feedManager = runtimeCtx.getFeedManager();
- this.policyAccessor = policyAccessor;
}
@Override
@@ -96,8 +94,8 @@
throw new HyracksDataException(e);
}
FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
- feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition, fta,
- feedManager);
+ feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
+ fta, feedManager);
adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
partition);
@@ -115,7 +113,7 @@
LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+ " connected to backend for feed " + feedId);
}
- feedFrameWriter = (DistributeFeedFrameWriter) ingestionRuntime.getFeedFrameWriter();
+ feedFrameWriter = ingestionRuntime.getFeedFrameWriter();
} else {
String message = "Feed Ingestion Runtime for feed " + feedId
+ " is already registered and is active!.";
@@ -125,17 +123,17 @@
}
waitTillIngestionIsOver(adapterRuntimeManager);
- feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime
- .getRuntimeId());
+ feedSubscriptionManager
+ .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
throw new HyracksDataException("Unable to ingest data");
}
} catch (InterruptedException ie) {
/*
- * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
+ * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
* As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
- * The surviving intake partitions must continue to live and receive data from the external source.
+ * The surviving intake partitions must continue to live and receive data from the external source.
*/
List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
@@ -155,7 +153,8 @@
ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
+ LOGGER.warning(
+ "Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
}
}
}
@@ -167,10 +166,11 @@
}
} else {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
+ LOGGER.info(
+ "Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
}
- feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime
- .getRuntimeId());
+ feedSubscriptionManager
+ .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
throw new HyracksDataException(ie);
}
} catch (Exception e) {
@@ -199,8 +199,8 @@
LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
}
synchronized (adapterRuntimeManager) {
- while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION) || (adapterRuntimeManager
- .getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
+ while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION)
+ || (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
adapterRuntimeManager.wait();
}
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 0488ffa..313fa1a 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -162,6 +162,8 @@
.getStorageTimeTrackingRateTask();
sTask.receiveCommitAckResponse(commitResponseMessage);
break;
+ default:
+ break;
}
}
@@ -222,8 +224,8 @@
FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
FeedRuntimeType.INTAKE, partition);
- ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager().getSubscribableRuntime(
- subscribableRuntimeId);
+ ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
+ .getSubscribableRuntime(subscribableRuntimeId);
IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
adapterRuntimeManager.stop();
if (LOGGER.isLoggable(Level.INFO)) {
@@ -252,8 +254,8 @@
}
runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
- CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(
- connectionId, runtimeId);
+ CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
+ .getFeedRuntime(connectionId, runtimeId);
feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
@@ -268,9 +270,9 @@
// feed could be primary or secondary, doesn't matter
SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
- ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager().getSubscribableRuntime(
- feedSubscribableRuntimeId);
- DistributeFeedFrameWriter dWriter = (DistributeFeedFrameWriter) feedRuntime.getFeedFrameWriter();
+ ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
+ .getSubscribableRuntime(feedSubscribableRuntimeId);
+ DistributeFeedFrameWriter dWriter = feedRuntime.getFeedFrameWriter();
Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
IFrameWriter unsubscribingWriter = null;
@@ -287,6 +289,8 @@
}
}
break;
+ default:
+ break;
}
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
index b3209d1..e5e9df3 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
@@ -29,8 +29,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
@@ -59,6 +57,7 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -233,10 +232,10 @@
Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
- IOperatorDescriptor leftOpDesc = altered.getOperatorMap().get(
- oldNewOID.get(leftOp.getLeft().getOperatorId()));
- IOperatorDescriptor rightOpDesc = altered.getOperatorMap().get(
- oldNewOID.get(rightOp.getLeft().getOperatorId()));
+ IOperatorDescriptor leftOpDesc = altered.getOperatorMap()
+ .get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
+ IOperatorDescriptor rightOpDesc = altered.getOperatorMap()
+ .get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
}
@@ -269,6 +268,8 @@
lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
locations.add(lc);
break;
+ default:
+ break;
}
}
@@ -416,7 +417,8 @@
locations.add(lc);
}
}
-
+ break;
+ default:
break;
}
}
@@ -444,8 +446,8 @@
try {
MetadataManager.INSTANCE.acquireReadLatch();
ctx = MetadataManager.INSTANCE.beginTransaction();
- feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(), connectionId
- .getFeedId().getFeedName());
+ feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(),
+ connectionId.getFeedId().getFeedName());
preProcessingRequired = feed.getAppliedFunction() != null;
MetadataManager.INSTANCE.commitTransaction(ctx);
} catch (Exception e) {
@@ -463,9 +465,8 @@
return preProcessingRequired;
}
- public static Triple<IFeedAdapterFactory, ARecordType, AdapterType> getPrimaryFeedFactoryAndOutput(
- PrimaryFeed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx)
- throws AlgebricksException {
+ public static Triple<IFeedAdapterFactory, ARecordType, AdapterType> getPrimaryFeedFactoryAndOutput(PrimaryFeed feed,
+ FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
String adapterName = null;
DatasourceAdapter adapterEntity = null;
@@ -580,8 +581,8 @@
outputType = function.getReturnType();
}
} else {
- throw new IllegalArgumentException("Function " + appliedFunction
- + " associated with source feed not found in Metadata.");
+ throw new IllegalArgumentException(
+ "Function " + appliedFunction + " associated with source feed not found in Metadata.");
}
}
return outputType;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java
index f2211fc..5c5c068 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java
@@ -90,7 +90,7 @@
char ch;
while (true) {
ch = (char) in.read();
- if (((int) ch) == -1) {
+ if ((ch) == -1) {
break;
}
while (ch != EOL) {
@@ -119,54 +119,6 @@
}
}
}
-
- }
-
- }
-
- private static class MessageParser implements Runnable {
-
- private Socket client;
- private IMessageAnalyzer messageAnalyzer;
- private static final char EOL = (char) "\n".getBytes()[0];
-
- public MessageParser(Socket client, IMessageAnalyzer messageAnalyzer) {
- this.client = client;
- this.messageAnalyzer = messageAnalyzer;
- }
-
- @Override
- public void run() {
- CharBuffer buffer = CharBuffer.allocate(5000);
- char ch;
- try {
- InputStream in = client.getInputStream();
- while (true) {
- ch = (char) in.read();
- if (((int) ch) == -1) {
- break;
- }
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- synchronized (messageAnalyzer) {
- messageAnalyzer.getMessageQueue().add(s + "\n");
- }
- buffer.position(0);
- buffer.limit(5000);
- }
- } catch (IOException ioe) {
- ioe.printStackTrace();
- } finally {
- try {
- client.close();
- } catch (IOException ioe) {
- // do nothing
- }
- }
}
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/SocketMessageListener.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/SocketMessageListener.java
index 3824f95..ef1d242 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/SocketMessageListener.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/SocketMessageListener.java
@@ -41,14 +41,12 @@
private static final Logger LOGGER = Logger.getLogger(SocketMessageListener.class.getName());
- private final int port;
private final IMessageReceiver<String> messageReceiver;
private final MessageListenerServer listenerServer;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public SocketMessageListener(int port, IMessageReceiver<String> messageReceiver) {
- this.port = port;
this.messageReceiver = messageReceiver;
this.listenerServer = new MessageListenerServer(port, messageReceiver);
}
@@ -128,7 +126,7 @@
char ch;
while (true) {
ch = (char) in.read();
- if (((int) ch) == -1) {
+ if ((ch) == -1) {
break;
}
while (ch != EOL) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
index 945305e..b626045 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
@@ -81,8 +81,8 @@
}
Dataset dataset = metadata.findDataset(dataverseName, datasetName);
if (dataset == null) {
- throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse "
- + dataverseName);
+ throw new AlgebricksException(
+ "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
}
String tn = dataset.getItemTypeName();
IAType t2 = metadata.findType(dataverseName, tn);
@@ -162,8 +162,8 @@
}
Dataset dataset = metadata.findDataset(dataverseName, datasetName);
if (dataset == null) {
- throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse "
- + dataverseName);
+ throw new AlgebricksException(
+ "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
}
String tn = dataset.getItemTypeName();
IAType t2 = metadata.findType(dataverseName, tn);
@@ -186,6 +186,6 @@
first = nameComponents[0];
second = nameComponents[1];
}
- return new Pair(first, second);
+ return new Pair<String, String>(first, second);
}
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/DatasetNameValueExtractor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/DatasetNameValueExtractor.java
index e79fe1c..e81da3f 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/DatasetNameValueExtractor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/DatasetNameValueExtractor.java
@@ -25,7 +25,6 @@
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.api.IValueExtractor;
import org.apache.asterix.om.base.AString;
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index eaef9be..3e29828 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -110,7 +110,7 @@
private final AtomicBoolean replicationSuspended;
private AtomicBoolean terminateJobsReplication;
private AtomicBoolean jobsReplicationSuspended;
- private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
+ private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
private final Set<String> shuttingDownReplicaIds;
//replication threads
private ReplicationJobsProccessor replicationJobsProcessor;
@@ -125,7 +125,7 @@
private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
protected ReplicationLogBuffer currentTxnLogBuffer;
private ReplicationLogFlusher txnlogsReplicator;
- private Future<Object> txnLogReplicatorTask;
+ private Future<? extends Object> txnLogReplicatorTask;
private Map<String, SocketChannel> logsReplicaSockets = null;
public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties,
@@ -240,7 +240,7 @@
/**
* Processes the replication job based on its specifications
- *
+ *
* @param job
* The replication job
* @param replicasSockets
@@ -376,7 +376,7 @@
/**
* Waits and reads a response from a remote replica
- *
+ *
* @param socketChannel
* The socket to read the response from
* @param responseBuffer
@@ -425,7 +425,7 @@
/**
* Suspends proccessing replication jobs.
- *
+ *
* @param force
* a flag indicates if replication should be suspended right away or when the pending jobs are completed.
*/
@@ -523,7 +523,7 @@
/**
* Sends a shutdown event to remote replicas notifying them
* no more logs/files will be sent from this local replica.
- *
+ *
* @throws IOException
*/
private void sendShutdownNotifiction() throws IOException {
@@ -540,7 +540,7 @@
/**
* Sends a request to remote replicas
- *
+ *
* @param replicaSockets
* The sockets to send the request to.
* @param requestBuffer
@@ -571,7 +571,7 @@
/**
* Closes the passed replication sockets by sending GOODBYE request to remote replicas.
- *
+ *
* @param replicaSockets
*/
private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) {
@@ -602,7 +602,7 @@
/**
* Checks the state of a remote replica by trying to ping it.
- *
+ *
* @param replicaId
* The replica to check the state for.
* @param async
@@ -615,7 +615,7 @@
ReplicaStateChecker connector = new ReplicaStateChecker(replica, replicationProperties.getReplicationTimeOut(),
this, replicationProperties, suspendReplication);
- Future<Object> ft = asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector);
+ Future<? extends Object> ft = asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector);
if (!async) {
//wait until task is done
@@ -631,7 +631,7 @@
/**
* Updates the state of a remote replica.
- *
+ *
* @param replicaId
* The replica id to update.
* @param newState
@@ -686,7 +686,7 @@
/**
* When an ACK for a JOB_COMMIT is received, it is added to the corresponding job.
- *
+ *
* @param jobId
* @param replicaId
* The remote replica id the ACK received from.
@@ -763,7 +763,7 @@
/**
* Establishes a connection with a remote replica.
- *
+ *
* @param replicaId
* The replica to connect to.
* @return The socket of the remote replica
@@ -855,7 +855,7 @@
/**
* Suspends replications and sends a remote replica failure event to ReplicasEventsMonitor.
- *
+ *
* @param replicaId
* the failed replica id.
*/
@@ -1071,7 +1071,7 @@
logRecord.deserialize(dataBuffer, true, nodeId);
if (logRecord.getNodeId().equals(nodeId)) {
- //store log in memory to replay it for recovery
+ //store log in memory to replay it for recovery
recoveryLogs.add(logRecord);
//this needs to be a new log object so that it is passed to recovery manager as a different object
logRecord = new LogRecord();
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java
index ca6e452..f64c888 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java
@@ -20,7 +20,6 @@
import java.io.DataOutput;
-import org.apache.asterix.common.functions.FunctionConstants;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -44,6 +43,7 @@
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.EMPTY_STREAM;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new EmptyStreamAggregateDescriptor();
}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/accessors/CircleCenterAccessor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/accessors/CircleCenterAccessor.java
index 348731d..d8c442a 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/accessors/CircleCenterAccessor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/accessors/CircleCenterAccessor.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.functions.FunctionConstants;
import org.apache.asterix.dataflow.data.nontagged.Coordinate;
import org.apache.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryBase64StringConstructorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryBase64StringConstructorDescriptor.java
index 0120d6a..a589aa7 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryBase64StringConstructorDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryBase64StringConstructorDescriptor.java
@@ -30,24 +30,29 @@
import org.apache.hyracks.dataflow.common.data.parsers.ByteArrayBase64ParserFactory;
public class ABinaryBase64StringConstructorDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
+ private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
- @Override public IFunctionDescriptor createFunctionDescriptor() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
return new ABinaryBase64StringConstructorDescriptor();
}
};
- @Override public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ @Override
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
- @Override public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
return new ABinaryHexStringConstructorDescriptor.ABinaryConstructorEvaluator(output, args[0],
ByteArrayBase64ParserFactory.INSTANCE);
}
};
}
- @Override public FunctionIdentifier getIdentifier() {
+ @Override
+ public FunctionIdentifier getIdentifier() {
return AsterixBuiltinFunctions.BINARY_BASE64_CONSTRUCTOR;
}
}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryHexStringConstructorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryHexStringConstructorDescriptor.java
index ef54caf..7b38f50 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryHexStringConstructorDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/ABinaryHexStringConstructorDescriptor.java
@@ -46,18 +46,19 @@
public class ABinaryHexStringConstructorDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new ABinaryHexStringConstructorDescriptor();
}
};
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output)
- throws AlgebricksException {
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new ABinaryConstructorEvaluator(output, args[0], ByteArrayHexParserFactory.INSTANCE);
}
};
@@ -80,8 +81,7 @@
.getSerializerDeserializer(BuiltinType.ANULL);
public ABinaryConstructorEvaluator(final IDataOutputProvider output, ICopyEvaluatorFactory copyEvaluatorFactory,
- IValueParserFactory valueParserFactory)
- throws AlgebricksException {
+ IValueParserFactory valueParserFactory) throws AlgebricksException {
out = output.getDataOutput();
outInput = new ArrayBackedValueStorage();
eval = copyEvaluatorFactory.createEvaluator(outInput);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractBinaryStringBoolEval.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractBinaryStringBoolEval.java
index c40cb95..0e454179 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractBinaryStringBoolEval.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractBinaryStringBoolEval.java
@@ -19,12 +19,10 @@
package org.apache.asterix.runtime.evaluators.functions;
import java.io.DataOutput;
-import java.util.Arrays;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PrefixLenJaccardDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PrefixLenJaccardDescriptor.java
index f07da63..8fc715e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PrefixLenJaccardDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PrefixLenJaccardDescriptor.java
@@ -51,6 +51,7 @@
private final static byte SER_FLOAT_TYPE_TAG = ATypeTag.FLOAT.serialize();
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new PrefixLenJaccardDescriptor();
}
@@ -71,7 +72,6 @@
private final ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private final ICopyEvaluator evalLen = args[0].createEvaluator(inputVal);
private final ICopyEvaluator evalThreshold = args[1].createEvaluator(inputVal);
- private final ArrayBackedValueStorage castBuffer = new ArrayBackedValueStorage();
private float similarityThresholdCache;
private SimilarityFiltersJaccard similarityFilters;
@@ -102,8 +102,7 @@
+ ": expects type FLOAT the first argument but got "
+ EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]));
}
- float similarityThreshold = (float) AFloatSerializerDeserializer.getFloat(
- inputVal.getByteArray(), 1);
+ float similarityThreshold = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
if (similarityThreshold != similarityThresholdCache || similarityFilters == null) {
similarityFilters = new SimilarityFiltersJaccard(similarityThreshold);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEqualDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEqualDescriptor.java
index 312d28e..8cc3400 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEqualDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEqualDescriptor.java
@@ -30,12 +30,12 @@
import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import org.apache.hyracks.data.std.api.IDataOutputProvider;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public class StringEqualDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new StringEqualDescriptor();
}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringLengthDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringLengthDescriptor.java
index 4f53a53..47ecb5b 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringLengthDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringLengthDescriptor.java
@@ -38,7 +38,6 @@
import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.data.std.api.IDataOutputProvider;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.util.string.UTF8StringUtil;
@@ -47,6 +46,7 @@
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new StringLengthDescriptor();
}
@@ -82,7 +82,7 @@
byte[] serString = outInput.getByteArray();
if (serString[0] == SER_STRING_TYPE_TAG) {
int len = UTF8StringUtil.getUTFLength(outInput.getByteArray(), 1);
- result.setValue((long) len);
+ result.setValue(len);
int64Serde.serialize(result, out);
} else if (serString[0] == SER_NULL_TYPE_TAG)
nullSerde.serialize(ANull.NULL, out);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
index a6d9b57..1e676c9 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
@@ -39,6 +39,8 @@
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
public class BinaryConcatDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
@@ -52,12 +54,12 @@
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output)
- throws AlgebricksException {
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new AbstractCopyEvaluator(output, args) {
private final AsterixListAccessor listAccessor = new AsterixListAccessor();
@@ -69,8 +71,7 @@
ATypeTag typeTag = evaluateTuple(tuple, 0);
if (typeTag != ATypeTag.UNORDEREDLIST && typeTag != ATypeTag.ORDEREDLIST) {
throw new AlgebricksException(getIdentifier().getName()
- + ": expects input type ORDEREDLIST/UNORDEREDLIST, but got "
- + typeTag);
+ + ": expects input type ORDEREDLIST/UNORDEREDLIST, but got " + typeTag);
}
try {
listAccessor.reset(storages[0].getByteArray(), 0);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryLengthDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryLengthDescriptor.java
index c42e054..fc6329d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryLengthDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryLengthDescriptor.java
@@ -41,6 +41,7 @@
public class BinaryLengthDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new BinaryLengthDescriptor();
}
@@ -48,11 +49,13 @@
private static final ATypeTag[] EXPECTED_TAGS = { ATypeTag.BINARY };
- @Override public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ @Override
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
- @Override public ICopyEvaluator createEvaluator(final IDataOutputProvider output)
- throws AlgebricksException {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new AbstractCopyEvaluator(output, args) {
private AMutableInt64 result = new AMutableInt64(0);
@@ -60,7 +63,8 @@
private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.AINT64);
- @Override public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
ATypeTag tag = evaluateTuple(tuple, 0);
try {
if (serializeNullIfAnyNull(tag)) {
@@ -80,7 +84,8 @@
};
}
- @Override public FunctionIdentifier getIdentifier() {
+ @Override
+ public FunctionIdentifier getIdentifier() {
return AsterixBuiltinFunctions.BINARY_LENGTH;
}
}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryDescriptor.java
index af0aaba..4a30847 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryDescriptor.java
@@ -39,7 +39,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class FindBinaryDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
+ private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
@@ -55,12 +55,12 @@
private static final ATypeTag[] EXPECTED_INPUT_TAG = { ATypeTag.BINARY, ATypeTag.BINARY };
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output)
- throws AlgebricksException {
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new AbstractFindBinaryCopyEvaluator(output, args, getIdentifier().getName()) {
@Override
protected int getFromOffset(IFrameTupleReference tuple) throws AlgebricksException {
@@ -98,15 +98,13 @@
if (serializeNullIfAnyNull(textTag, wordTag)) {
return;
}
- checkTypeMachingThrowsIfNot(functionName, EXPECTED_INPUT_TAG, textTag,
- wordTag);
+ checkTypeMachingThrowsIfNot(functionName, EXPECTED_INPUT_TAG, textTag, wordTag);
textPtr.set(storages[0].getByteArray(), 1, storages[0].getLength() - 1);
wordPtr.set(storages[1].getByteArray(), 1, storages[1].getLength() - 1);
- result.setValue(
- 1 + indexOf(textPtr.getByteArray(), textPtr.getContentStartOffset(), textPtr.getContentLength(),
- wordPtr.getByteArray(), wordPtr.getContentStartOffset(), wordPtr.getContentLength(),
- fromOffset));
+ result.setValue(1 + indexOf(textPtr.getByteArray(), textPtr.getContentStartOffset(),
+ textPtr.getContentLength(), wordPtr.getByteArray(), wordPtr.getContentStartOffset(),
+ wordPtr.getContentLength(), fromOffset));
intSerde.serialize(result, dataOutput);
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
@@ -117,9 +115,8 @@
}
// copy from String.indexOf(String)
- static int indexOf(byte[] source, int sourceOffset, int sourceCount,
- byte[] target, int targetOffset, int targetCount,
- int fromIndex) {
+ static int indexOf(byte[] source, int sourceOffset, int sourceCount, byte[] target, int targetOffset,
+ int targetCount, int fromIndex) {
if (fromIndex >= sourceCount) {
return (targetCount == 0 ? sourceCount : -1);
}
@@ -136,17 +133,14 @@
for (int i = sourceOffset + fromIndex; i <= max; i++) {
/* Look for first character. */
if (source[i] != first) {
- while (++i <= max && source[i] != first)
- ;
+ while (++i <= max && source[i] != first);
}
/* Found first character, now look at the rest of v2 */
if (i <= max) {
int j = i + 1;
int end = j + targetCount - 1;
- for (int k = targetOffset + 1; j < end && source[j]
- == target[k]; j++, k++)
- ;
+ for (int k = targetOffset + 1; j < end && source[j] == target[k]; j++, k++);
if (j == end) {
/* Found whole string. */
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryFromDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryFromDescriptor.java
index 753b072..12f4486 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryFromDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/FindBinaryFromDescriptor.java
@@ -22,7 +22,6 @@
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
-import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -34,6 +33,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class FindBinaryFromDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
@@ -50,13 +50,15 @@
@Override
public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new FindBinaryDescriptor.AbstractFindBinaryCopyEvaluator(output, args, getIdentifier().getName()) {
+ return new FindBinaryDescriptor.AbstractFindBinaryCopyEvaluator(output, args,
+ getIdentifier().getName()) {
@Override
protected int getFromOffset(IFrameTupleReference tuple) throws AlgebricksException {
- ATypeTag offsetTag = evaluateTuple(tuple, 2);
-
+ evaluateTuple(tuple, 2);
int getFrom = 0;
try {
getFrom = ATypeHierarchy.getIntegerValue(storages[2].getByteArray(), 0);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/ParseBinaryDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/ParseBinaryDescriptor.java
index ffe2c9a..8adb21c 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/ParseBinaryDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/ParseBinaryDescriptor.java
@@ -41,7 +41,7 @@
import org.apache.hyracks.util.bytes.HexParser;
public class ParseBinaryDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
+ private static final long serialVersionUID = 1L;
static final UTF8StringPointable HEX_FORMAT = UTF8StringPointable.generateUTF8Pointable("hex");
static final UTF8StringPointable BASE64_FORMAT = UTF8StringPointable.generateUTF8Pointable("base64");
@@ -60,12 +60,12 @@
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output)
- throws AlgebricksException {
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new AbstractCopyEvaluator(output, args) {
@SuppressWarnings("unchecked")
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/PrintBinaryDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/PrintBinaryDescriptor.java
index 75bb628..e6de4ce 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/PrintBinaryDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/PrintBinaryDescriptor.java
@@ -43,7 +43,7 @@
import org.apache.hyracks.util.string.UTF8StringWriter;
public class PrintBinaryDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
+ private static final long serialVersionUID = 1L;
private static final byte SER_STRING_BYTE = ATypeTag.STRING.serialize();
@Override
@@ -61,12 +61,12 @@
public final static ATypeTag[] EXPECTED_INPUT_TAGS = { ATypeTag.BINARY, ATypeTag.STRING };
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output)
- throws AlgebricksException {
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new AbstractCopyEvaluator(output, args) {
private StringBuilder stringBuilder = new StringBuilder();
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromDescriptor.java
index a5b5e99..b99a15b 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromDescriptor.java
@@ -28,29 +28,33 @@
import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import org.apache.hyracks.data.std.api.IDataOutputProvider;
-import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SubBinaryFromDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
+ private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
- @Override public IFunctionDescriptor createFunctionDescriptor() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
return new SubBinaryFromDescriptor();
}
};
- @Override public FunctionIdentifier getIdentifier() {
+ @Override
+ public FunctionIdentifier getIdentifier() {
return AsterixBuiltinFunctions.SUBBINARY_FROM;
}
- @Override public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ @Override
+ public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
- @Override public ICopyEvaluator createEvaluator(final IDataOutputProvider output)
- throws AlgebricksException {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new SubBinaryFromToDescriptor.AbstractSubBinaryCopyEvaluator(output, args,
getIdentifier().getName()) {
- @Override protected int getSubLength(IFrameTupleReference tuple) throws AlgebricksException {
+ @Override
+ protected int getSubLength(IFrameTupleReference tuple) throws AlgebricksException {
return Integer.MAX_VALUE;
}
};
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromToDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromToDescriptor.java
index 4f35de8..34e527e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromToDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/SubBinaryFromToDescriptor.java
@@ -38,7 +38,7 @@
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
public class SubBinaryFromToDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
+ private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
@@ -54,12 +54,14 @@
@Override
public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopyEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
return new AbstractSubBinaryCopyEvaluator(output, args, getIdentifier().getName()) {
@Override
protected int getSubLength(IFrameTupleReference tuple) throws AlgebricksException {
- ATypeTag tagSubLength = evaluateTuple(tuple, 2);
+ evaluateTuple(tuple, 2);
int subLength = 0;
try {
subLength = ATypeHierarchy.getIntegerValue(storages[2].getByteArray(), 0);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
index 327c851..6e51311 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
@@ -29,7 +29,6 @@
import org.apache.asterix.builders.IAsterixListBuilder;
import org.apache.asterix.builders.ListBuilderFactory;
import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.builders.RecordBuilderFactory;
import org.apache.asterix.builders.UnorderedListBuilder;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -156,8 +155,8 @@
}
}
- protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out) throws AsterixException,
- IOException, AdmLexerException {
+ protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out)
+ throws AsterixException, IOException, AdmLexerException {
int token = admLexer.next();
if (token == AdmLexer.TOKEN_EOF) {
return false;
@@ -525,8 +524,8 @@
return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
}
- private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec) throws IOException,
- AsterixException, AdmLexerException {
+ private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec)
+ throws IOException, AsterixException, AdmLexerException {
ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
@@ -592,8 +591,8 @@
openRecordField = false;
}
} else {
- aStringFieldName.setValue(admLexer.getLastTokenImage().substring(1,
- admLexer.getLastTokenImage().length() - 1));
+ aStringFieldName.setValue(
+ admLexer.getLastTokenImage().substring(1, admLexer.getLastTokenImage().length() - 1));
stringSerde.serialize(aStringFieldName, fieldNameBuffer.getDataOutput());
openRecordField = true;
fieldType = null;
@@ -674,8 +673,8 @@
return -1;
}
- private void parseOrderedList(AOrderedListType oltype, DataOutput out) throws IOException, AsterixException,
- AdmLexerException {
+ private void parseOrderedList(AOrderedListType oltype, DataOutput out)
+ throws IOException, AsterixException, AdmLexerException {
ArrayBackedValueStorage itemBuffer = getTempBuffer();
OrderedListBuilder orderedListBuilder = (OrderedListBuilder) getOrderedListBuilder();
@@ -716,8 +715,8 @@
orderedListBuilder.write(out, true);
}
- private void parseUnorderedList(AUnorderedListType uoltype, DataOutput out) throws IOException, AsterixException,
- AdmLexerException {
+ private void parseUnorderedList(AUnorderedListType uoltype, DataOutput out)
+ throws IOException, AsterixException, AdmLexerException {
ArrayBackedValueStorage itemBuffer = getTempBuffer();
UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
@@ -779,8 +778,8 @@
return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY);
}
- private void parseToBinaryTarget(int lexerToken, String tokenImage, DataOutput out) throws ParseException,
- HyracksDataException {
+ private void parseToBinaryTarget(int lexerToken, String tokenImage, DataOutput out)
+ throws ParseException, HyracksDataException {
switch (lexerToken) {
case AdmLexer.TOKEN_HEX_CONS: {
parseHexBinaryString(tokenImage.toCharArray(), 1, tokenImage.length() - 2, out);
@@ -793,16 +792,16 @@
}
}
- private void parseToNumericTarget(ATypeTag typeTag, IAType objectType, DataOutput out) throws AsterixException,
- IOException {
+ private void parseToNumericTarget(ATypeTag typeTag, IAType objectType, DataOutput out)
+ throws AsterixException, IOException {
final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
if (targetTypeTag == null || !parseValue(admLexer.getLastTokenImage(), targetTypeTag, out)) {
throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
}
}
- private void parseAndCastNumeric(ATypeTag typeTag, IAType objectType, DataOutput out) throws AsterixException,
- IOException {
+ private void parseAndCastNumeric(ATypeTag typeTag, IAType objectType, DataOutput out)
+ throws AsterixException, IOException {
final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
DataOutput dataOutput = out;
if (targetTypeTag != typeTag) {
@@ -820,7 +819,8 @@
// can promote typeTag to targetTypeTag
ITypeConvertComputer promoteComputer = ATypeHierarchy.getTypePromoteComputer(typeTag, targetTypeTag);
if (promoteComputer == null) {
- throw new AsterixException("Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
+ throw new AsterixException(
+ "Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
}
// do the promotion; note that the type tag field should be skipped
promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
@@ -829,7 +829,8 @@
//can demote source type to the target type
ITypeConvertComputer demoteComputer = ATypeHierarchy.getTypeDemoteComputer(typeTag, targetTypeTag);
if (demoteComputer == null) {
- throw new AsterixException("Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
+ throw new AsterixException(
+ "Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
}
// do the demotion; note that the type tag field should be skipped
demoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
@@ -838,8 +839,8 @@
}
}
- private void parseConstructor(ATypeTag typeTag, IAType objectType, DataOutput out) throws AsterixException,
- AdmLexerException, IOException {
+ private void parseConstructor(ATypeTag typeTag, IAType objectType, DataOutput out)
+ throws AsterixException, AdmLexerException, IOException {
final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
if (targetTypeTag != null) {
DataOutput dataOutput = out;
@@ -876,8 +877,8 @@
throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + ". Got " + typeTag + " instead.");
}
- private boolean parseValue(final String unquoted, ATypeTag typeTag, DataOutput out) throws AsterixException,
- HyracksDataException, IOException {
+ private boolean parseValue(final String unquoted, ATypeTag typeTag, DataOutput out)
+ throws AsterixException, HyracksDataException, IOException {
switch (typeTag) {
case BOOLEAN:
parseBoolean(unquoted, out);
@@ -1008,8 +1009,8 @@
for (; offset < int16.length(); offset++) {
if (int16.charAt(offset) >= '0' && int16.charAt(offset) <= '9') {
value = (short) (value * 10 + int16.charAt(offset) - '0');
- } else if (int16.charAt(offset) == 'i' && int16.charAt(offset + 1) == '1'
- && int16.charAt(offset + 2) == '6' && offset + 3 == int16.length()) {
+ } else if (int16.charAt(offset) == 'i' && int16.charAt(offset + 1) == '1' && int16.charAt(offset + 2) == '6'
+ && offset + 3 == int16.length()) {
break;
} else {
throw new ParseException(errorMessage);
@@ -1040,8 +1041,8 @@
for (; offset < int32.length(); offset++) {
if (int32.charAt(offset) >= '0' && int32.charAt(offset) <= '9') {
value = (value * 10 + int32.charAt(offset) - '0');
- } else if (int32.charAt(offset) == 'i' && int32.charAt(offset + 1) == '3'
- && int32.charAt(offset + 2) == '2' && offset + 3 == int32.length()) {
+ } else if (int32.charAt(offset) == 'i' && int32.charAt(offset + 1) == '3' && int32.charAt(offset + 2) == '2'
+ && offset + 3 == int32.length()) {
break;
} else {
throw new ParseException(errorMessage);
@@ -1073,8 +1074,8 @@
for (; offset < int64.length(); offset++) {
if (int64.charAt(offset) >= '0' && int64.charAt(offset) <= '9') {
value = (value * 10 + int64.charAt(offset) - '0');
- } else if (int64.charAt(offset) == 'i' && int64.charAt(offset + 1) == '6'
- && int64.charAt(offset + 2) == '4' && offset + 3 == int64.length()) {
+ } else if (int64.charAt(offset) == 'i' && int64.charAt(offset + 1) == '6' && int64.charAt(offset + 2) == '4'
+ && offset + 3 == int64.length()) {
break;
} else {
throw new ParseException(errorMessage);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
index 4b10560..4ce8ee8 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
@@ -112,6 +112,7 @@
@SuppressWarnings("unchecked")
protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ASTRING);
+ @SuppressWarnings("unchecked")
protected ISerializerDeserializer<ABinary> binarySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ABINARY);
@SuppressWarnings("unchecked")
@@ -229,8 +230,8 @@
}
}
chrononTimeInMs = ADateParserFactory.parseDatePart(datetime, 0, timeOffset);
- chrononTimeInMs += ATimeParserFactory.parseTimePart(datetime, timeOffset + 1, datetime.length()
- - timeOffset - 1);
+ chrononTimeInMs += ATimeParserFactory.parseTimePart(datetime, timeOffset + 1,
+ datetime.length() - timeOffset - 1);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java
index 2c83a20..e537ef7 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.feeds.api.IFeedAdapter;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SocketClientAdapter implements IFeedAdapter {
@@ -43,14 +42,11 @@
private final int port;
- private final IHyracksTaskContext ctx;
-
private boolean continueStreaming = true;
- public SocketClientAdapter(Integer port, String localFile, IHyracksTaskContext ctx) {
+ public SocketClientAdapter(Integer port, String localFile) {
this.localFile = localFile;
this.port = port;
- this.ctx = ctx;
}
@Override
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
index 6ee6401..4234a88 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SocketClientAdapterFactory implements IFeedAdapterFactory {
@@ -72,7 +71,7 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
Pair<String, Integer> socket = genericSocketAdapterFactory.getSockets().get(partition);
- return new SocketClientAdapter(socket.second, fileSplits[partition], ctx);
+ return new SocketClientAdapter(socket.second, fileSplits[partition]);
}
@Override
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index adce491..fffbc17 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -23,8 +23,6 @@
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 5a01920..e9004c4 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -33,7 +33,6 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
@@ -78,8 +77,8 @@
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- String ingestionCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
- String ingestionLocationParam = (String) configuration.get(KEY_INGESTION_LOCATIONS);
+ String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
+ String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
String[] locations = null;
if (ingestionLocationParam != null) {
locations = ingestionLocationParam.split(",");
@@ -90,8 +89,8 @@
}
List<String> chosenLocations = new ArrayList<String>();
- String[] availableLocations = locations != null ? locations : AsterixClusterProperties.INSTANCE
- .getParticipantNodes().toArray(new String[] {});
+ String[] availableLocations = locations != null ? locations
+ : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
chosenLocations.add(availableLocations[k]);
}
@@ -113,10 +112,12 @@
return InputDataFormat.ADM;
}
+ @Override
public boolean isRecordTrackingEnabled() {
return false;
}
+ @Override
public IIntakeProgressTracker createIntakeProgressTracker() {
throw new UnsupportedOperationException("Tracking of ingested records not enabled");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 6646ffb..a07a109 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -22,11 +22,10 @@
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.ILockManager;
import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -59,8 +58,7 @@
logRecord.setNodeId(txnSubsystem.getId());
}
- protected void log(int PKHash, ITupleReference newValue)
- throws ACIDException {
+ protected void log(int PKHash, ITupleReference newValue) throws ACIDException {
logRecord.setPKHashValue(PKHash);
logRecord.setPKFields(primaryKeyFields);
logRecord.setPKValue(newValue);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index e6e2b29..88e95dd 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -23,7 +23,6 @@
import java.util.Map;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index af508f5..e268134 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -19,6 +19,15 @@
package org.apache.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILockManager;
@@ -27,18 +36,6 @@
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
/**
* A concurrent implementation of the ILockManager interface.
*
@@ -79,11 +76,11 @@
static LockAction[][] ACTION_MATRIX = {
// new NL IS IX S X
- {LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD}, // NL
- {LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT}, // IS
- {LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT}, // IX
- {LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT}, // S
- {LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT} // X
+ { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
+ { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
+ { LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
+ { LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
+ { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
};
public ConcurrentLockManager(final int lockManagerShrinkTimer) throws ACIDException {
@@ -91,14 +88,15 @@
// TODO increase table size?
}
- public ConcurrentLockManager(final int lockManagerShrinkTimer, final int noArenas, final int tableSize) throws
- ACIDException {
+ public ConcurrentLockManager(final int lockManagerShrinkTimer, final int noArenas, final int tableSize)
+ throws ACIDException {
this.table = new ResourceGroupTable(tableSize);
resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer);
reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer);
jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
jobIdSlotMap = new ConcurrentHashMap<>();
dsLockCache = new ThreadLocal<DatasetLockCache>() {
+ @Override
protected DatasetLockCache initialValue() {
return new DatasetLockCache();
}
@@ -176,8 +174,7 @@
}
private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
- final LockAction act, ITransactionContext txnContext) throws ACIDException,
- InterruptedException {
+ final LockAction act, ITransactionContext txnContext) throws ACIDException, InterruptedException {
final Queue queue = act.modify ? upgrader : waiter;
if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
DeadlockTracker tracker = new CollectingTracker();
@@ -207,15 +204,19 @@
static class NOPTracker implements DeadlockTracker {
static final DeadlockTracker INSTANCE = new NOPTracker();
+ @Override
public void pushResource(long resSlot) {
}
+ @Override
public void pushRequest(long reqSlot) {
}
+ @Override
public void pushJob(long jobSlot) {
}
+ @Override
public void pop() {
}
}
@@ -231,26 +232,30 @@
public void pushResource(long resSlot) {
types.add("Resource");
slots.add(resSlot);
- if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG)
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
}
@Override
public void pushRequest(long reqSlot) {
types.add("Request");
slots.add(reqSlot);
- if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG)
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
}
@Override
public void pushJob(long jobSlot) {
types.add("Job");
slots.add(jobSlot);
- if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG)
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
}
@Override
public void pop() {
- if (DEBUG) System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG)
+ System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
types.remove(types.size() - 1);
slots.remove(slots.size() - 1);
}
@@ -270,16 +275,18 @@
* cycle in the wait-graph where the job waits on itself - but not directly on itself (which happens e.g. in the
* case of upgrading a lock from S to X).
*
- * @param resSlot the slot that contains the information about the resource
- * @param jobSlot the slot that contains the information about the job
+ * @param resSlot
+ * the slot that contains the information about the resource
+ * @param jobSlot
+ * the slot that contains the information about the job
* @return true if a cycle would be introduced, false otherwise
*/
private boolean introducesDeadlock(final long resSlot, final long jobSlot, final DeadlockTracker tracker) {
return introducesDeadlock(resSlot, jobSlot, tracker, 0);
}
- private boolean introducesDeadlock(final long resSlot, final long jobSlot,
- final DeadlockTracker tracker, final int depth) {
+ private boolean introducesDeadlock(final long resSlot, final long jobSlot, final DeadlockTracker tracker,
+ final int depth) {
synchronized (jobArenaMgr) {
tracker.pushResource(resSlot);
long reqSlot = resArenaMgr.getLastHolder(resSlot);
@@ -340,7 +347,7 @@
if (group.firstResourceIndex.get() == -1l) {
validateJob(txnContext);
// if we do not have a resource in the group, we know that the
- // resource that we are looking for is not locked
+ // resource that we are looking for is not locked
return;
}
@@ -447,7 +454,7 @@
@Override
public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
- ITransactionContext txnContext) throws ACIDException {
+ ITransactionContext txnContext) throws ACIDException {
log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
stats.instantTryLock();
@@ -466,7 +473,7 @@
if (group.firstResourceIndex.get() == -1l) {
validateJob(txnContext);
// if we do not have a resource in the group, we know that the
- // resource that we are looking for is not locked
+ // resource that we are looking for is not locked
return true;
}
@@ -630,13 +637,13 @@
resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
group.firstResourceIndex.set(resSlot);
if (DEBUG_MODE) {
- LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " +
- entityHashValue + ")");
+ LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue
+ + ")");
}
} else {
if (DEBUG_MODE) {
- LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " +
- entityHashValue + ")");
+ LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue
+ + ")");
}
}
return resSlot;
@@ -670,9 +677,12 @@
* b) acquire the lock if we want to lock the same resource with the same
* lock mode for the same job.
*
- * @param resource the resource slot that's being locked
- * @param job the job slot of the job locking the resource
- * @param lockMode the lock mode that the resource should be locked with
+ * @param resource
+ * the resource slot that's being locked
+ * @param job
+ * the job slot of the job locking the resource
+ * @param lockMode
+ * the lock mode that the resource should be locked with
* @return
*/
private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
@@ -697,7 +707,7 @@
stats.logCounters(LOGGER, Level.INFO, false);
long resSlot = group.firstResourceIndex.get();
while (resSlot != -1) {
- // either we already have a lock on this resource or we have a
+ // either we already have a lock on this resource or we have a
// hash collision
if (resArenaMgr.getDatasetId(resSlot) == dsId && resArenaMgr.getPkHashVal(resSlot) == entityHashValue) {
return resSlot;
@@ -779,6 +789,7 @@
}
final Queue waiter = new Queue() {
+ @Override
public void add(long request, long resource, long job) {
long waiter = resArenaMgr.getFirstWaiter(resource);
reqArenaMgr.setNextRequest(request, -1);
@@ -794,6 +805,7 @@
}
}
+ @Override
public void remove(long request, long resource, long job) {
long waiter = resArenaMgr.getFirstWaiter(resource);
if (waiter == request) {
@@ -811,6 +823,7 @@
};
final Queue upgrader = new Queue() {
+ @Override
public void add(long request, long resource, long job) {
long upgrader = resArenaMgr.getFirstUpgrader(resource);
reqArenaMgr.setNextRequest(request, -1);
@@ -826,6 +839,7 @@
}
}
+ @Override
public void remove(long request, long resource, long job) {
long upgrader = resArenaMgr.getFirstUpgrader(resource);
if (upgrader == request) {
@@ -882,9 +896,12 @@
* If the value of the parameter lockMode is LockMode.ANY the first request
* for the job is removed - independent of the LockMode.
*
- * @param head the head of the request queue
- * @param jobSlot the job slot
- * @param lockMode the lock mode
+ * @param head
+ * the head of the request queue
+ * @param jobSlot
+ * the job slot
+ * @param lockMode
+ * the lock mode
* @return the slot of the first request that matched the given job
*/
private long removeRequestFromQueueForJob(long head, long jobSlot, byte lockMode) {
@@ -945,8 +962,8 @@
private void requestAbort(ITransactionContext txnContext, String msg) throws ACIDException {
txnContext.setTimeout(true);
- throw new ACIDException("Transaction " + txnContext.getJobId()
- + " should abort (requested by the Lock Manager)" + ":\n" + msg);
+ throw new ACIDException(
+ "Transaction " + txnContext.getJobId() + " should abort (requested by the Lock Manager)" + ":\n" + msg);
}
/*
@@ -1067,6 +1084,7 @@
return getResourceTablePrinter().append(new StringBuilder()).append("\n").toString();
}
+ @Override
public String toString() {
return printByResource();
}
@@ -1101,7 +1119,7 @@
private static class DatasetLockCache {
private long jobId = -1;
private HashMap<Integer, Byte> lockCache = new HashMap<Integer, Byte>();
- // size 1 cache to avoid the boxing/unboxing that comes with the
+ // size 1 cache to avoid the boxing/unboxing that comes with the
// access to the HashMap
private int cDsId = -1;
private byte cDsLockMode = -1;
@@ -1133,10 +1151,10 @@
this.lockCache.put(dsId, dsLockMode);
}
+ @Override
public String toString() {
return "[ " + jobId + " : " + lockCache.toString() + "]";
}
}
}
-
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index 1af2200..eab8f46 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -23,9 +23,6 @@
import java.util.ArrayList;
import java.util.Random;
-import org.apache.commons.io.FileUtils;
-
-import org.apache.asterix.common.api.AsterixThreadExecutor;
import org.apache.asterix.common.config.AsterixPropertiesAccessor;
import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -39,6 +36,7 @@
import org.apache.asterix.transaction.management.service.transaction.TransactionContext;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.commons.io.FileUtils;
/**
* LockManagerUnitTest: unit test of LockManager
@@ -60,7 +58,8 @@
//prepare configuration file
File cwd = new File(System.getProperty("user.dir"));
File asterixdbDir = cwd.getParentFile();
- File srcFile = new File(asterixdbDir.getAbsoluteFile(), "asterix-app/src/main/resources/asterix-build-configuration.xml");
+ File srcFile = new File(asterixdbDir.getAbsoluteFile(),
+ "asterix-app/src/main/resources/asterix-build-configuration.xml");
File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
FileUtils.copyFile(srcFile, destFile);
@@ -384,8 +383,8 @@
int datasetId = lockRequest.datasetIdObj.getId();
int entityHashValue = lockRequest.entityHashValue;
byte lockMode = LockMode.X;
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, new DatasetId(
- datasetId), entityHashValue, lockMode, txnContext);
+ LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType,
+ new DatasetId(datasetId), entityHashValue, lockMode, txnContext);
request.isUpgrade = true;
requestQueue.add(request);
requestHistory.append(request.prettyPrint());
@@ -417,8 +416,8 @@
int datasetId = lockRequest.datasetIdObj.getId();
int entityHashValue = lockRequest.entityHashValue;
byte lockMode = lockRequest.lockMode;
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, new DatasetId(
- datasetId), entityHashValue, lockMode, txnContext);
+ LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType,
+ new DatasetId(datasetId), entityHashValue, lockMode, txnContext);
requestQueue.add(request);
requestHistory.append(request.prettyPrint());
sendRequest(request);
@@ -449,8 +448,8 @@
int datasetId = lockRequest.datasetIdObj.getId();
int entityHashValue = lockRequest.entityHashValue;
byte lockMode = lockRequest.lockMode;
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, new DatasetId(
- datasetId), entityHashValue, lockMode, txnContext);
+ LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType,
+ new DatasetId(datasetId), entityHashValue, lockMode, txnContext);
requestQueue.add(request);
requestHistory.append(request.prettyPrint());
sendRequest(request);
@@ -523,7 +522,7 @@
throw new UnsupportedOperationException("Unsupported lock method");
}
try {
- Thread.sleep((long) 0);
+ Thread.sleep(0);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -577,7 +576,7 @@
public String toString() {
return prettyPrint();
}
-
+
public String prettyPrint() {
StringBuilder s = new StringBuilder();
//s.append(threadName.charAt(7)).append("\t").append("\t");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
index 90c1f69..eff9e21 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
@@ -19,9 +19,7 @@
package org.apache.asterix.transaction.management.service.locking;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
-
-import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
/**
* Creates a JSON serialization of the lock table of the ConcurrentLockManager organized by resource. I.e. the
@@ -36,16 +34,15 @@
private RequestArenaManager reqArenaMgr;
private JobArenaManager jobArenaMgr;
- ResourceTablePrinter(ResourceGroupTable table,
- ResourceArenaManager resArenaMgr,
- RequestArenaManager reqArenaMgr,
- JobArenaManager jobArenaMgr) {
+ ResourceTablePrinter(ResourceGroupTable table, ResourceArenaManager resArenaMgr, RequestArenaManager reqArenaMgr,
+ JobArenaManager jobArenaMgr) {
this.table = table;
this.resArenaMgr = resArenaMgr;
this.reqArenaMgr = reqArenaMgr;
this.jobArenaMgr = jobArenaMgr;
}
+ @Override
public StringBuilder append(StringBuilder sb) {
table.getAllLatches();
sb.append("[\n");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java
index 8171f77..a4a5ad6 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java
@@ -22,6 +22,8 @@
import org.apache.asterix.common.transactions.ITransactionContext;
public class WaitInterruptedException extends ACIDException {
+ private static final long serialVersionUID = 1L;
+
public WaitInterruptedException(ITransactionContext txnContext, String message, Throwable cause) {
super(txnContext, message, cause);
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
index f8e0253..28f21e5 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
-import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS;
import org.apache.asterix.common.transactions.LogRecord;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 691a4d8..84576b5 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -69,7 +69,7 @@
private FileChannel appendChannel;
protected LogBuffer appendPage;
private LogFlusher logFlusher;
- private Future<Object> futureLogFlusher;
+ private Future<? extends Object> futureLogFlusher;
private static final long SMALLEST_LOG_FILE_ID = 0;
private final String nodeId;
protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
@@ -190,7 +190,7 @@
appendChannel = getFileChannel(appendLSN.get(), true);
appendPage.isLastPage(true);
//[Notice]
- //the current log file channel is closed if
+ //the current log file channel is closed if
//LogBuffer.flush() completely flush the last page of the file.
}
@@ -313,6 +313,7 @@
initializeLogManager(lastMaxLogFileId + 1);
}
+ @Override
public void deleteOldLogFiles(long checkpointLSN) {
Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
@@ -376,6 +377,7 @@
List<Long> logFileIds = null;
if (fileLogDir.exists()) {
logFileNames = fileLogDir.list(new FilenameFilter() {
+ @Override
public boolean accept(File dir, String name) {
if (name.startsWith(logFilePrefix)) {
return true;
@@ -448,6 +450,7 @@
return newFileChannel;
}
+ @Override
public long getReadableSmallestLSN() {
List<Long> logFileIds = getLogFileIds();
if (logFileIds != null) {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index bc644841..ac37a08 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -43,12 +43,11 @@
* An object of TransactionContext is created and accessed(read/written) by multiple threads which work for
* a single job identified by a jobId. Thus, the member variables in the object can be read/written
* concurrently. Please see each variable declaration to know which one is accessed concurrently and
- * which one is not.
+ * which one is not.
*/
public class TransactionContext implements ITransactionContext, Serializable {
private static final long serialVersionUID = -6105616785783310111L;
- private final TransactionSubsystem transactionSubsystem;
// jobId is set once and read concurrently.
private final JobId jobId;
@@ -99,7 +98,6 @@
// moment.
public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem) throws ACIDException {
this.jobId = jobId;
- this.transactionSubsystem = transactionSubsystem;
firstLSN = new AtomicLong(-1);
lastLSN = new AtomicLong(-1);
txnState = new AtomicInteger(ITransactionManager.ACTIVE);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystemProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystemProvider.java
deleted file mode 100644
index ea3407b..0000000
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystemProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.transaction.management.service.transaction;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * The purpose of this provider is to work around a cyclic dependency between asterix-common and asterix-transactions.
- * The operation callbacks would depend on the AsterixAppRuntimeContext to get the transaction subsystem,
- * while at the same time the AsterixAppRuntimeContext depends on asterix-transactions for the TransactionSubsystem.
- */
-public class TransactionSubsystemProvider implements ITransactionSubsystemProvider {
- @Override
- public ITransactionSubsystem getTransactionSubsystem(IHyracksTaskContext ctx) {
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- return runtimeCtx.getTransactionSubsystem();
- }
-}
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/HDFSBackup.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/HDFSBackup.java
index 47c1d41..5dc812a 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/HDFSBackup.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/HDFSBackup.java
@@ -46,14 +46,14 @@
HDFSBackup back = new HDFSBackup();
Map<String, String> envs = System.getenv();
- if(envs.containsKey("HADOOP_CONF_DIR")){
+ if (envs.containsKey("HADOOP_CONF_DIR")) {
File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
- if(hadoopConfDir.isDirectory()){
- for(File config: hadoopConfDir.listFiles()){
- if(config.getName().matches("^.*(xml)$")){
- back.conf.addResource(new Path(config.getAbsolutePath()));
- }
- }
+ if (hadoopConfDir.isDirectory()) {
+ for (File config : hadoopConfDir.listFiles()) {
+ if (config.getName().matches("^.*(xml)$")) {
+ back.conf.addResource(new Path(config.getAbsolutePath()));
+ }
+ }
}
}
Options opts = new Options();
@@ -67,7 +67,7 @@
back.backup = true;
}
@SuppressWarnings("unchecked")
- List<String> pairs = (List<String>) cliParser.getArgList();
+ List<String> pairs = cliParser.getArgList();
List<Path[]> sources = new ArrayList<Path[]>(10);
for (String p : pairs) {
@@ -83,7 +83,7 @@
back.performRestore(sources);
}
} catch (IOException e) {
- back.LOG.fatal("Backup/restoration unsuccessful: " + e.getMessage());
+ LOG.fatal("Backup/restoration unsuccessful: " + e.getMessage());
throw e;
}
}
diff --git a/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java b/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
index cb85254..cd49300 100644
--- a/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
+++ b/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
@@ -21,8 +21,11 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
-import java.util.logging.Logger;
+import org.apache.asterix.aoya.AsterixYARNClient;
+import org.apache.asterix.aoya.Utils;
+import org.apache.asterix.event.schema.yarnCluster.Cluster;
+import org.apache.asterix.event.schema.yarnCluster.Node;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -31,11 +34,6 @@
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.Assert;
-import org.apache.asterix.aoya.AsterixYARNClient;
-import org.apache.asterix.aoya.Utils;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.Node;
-
public class AsterixYARNInstanceUtil {
private static final String PATH_ACTUAL = "ittest/";
private static final String INSTANCE_NAME = "asterix-integration-test";
diff --git a/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java b/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
index f0a56de..4e7ea8e 100644
--- a/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
+++ b/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
@@ -19,50 +19,24 @@
package org.apache.asterix.aoya.test;
import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.logging.Logger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runners.Parameterized.Parameters;
-import org.junit.FixMethodOrder;
-import org.junit.runners.MethodSorters;
-
import org.apache.asterix.aoya.AsterixYARNClient;
import org.apache.asterix.aoya.Utils;
-import org.apache.asterix.event.error.VerificationUtil;
-import org.apache.asterix.event.model.AsterixInstance;
-import org.apache.asterix.event.model.AsterixInstance.State;
-import org.apache.asterix.event.model.AsterixRuntimeState;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.Node;
-import org.apache.asterix.event.service.ServiceProvider;
-import org.apache.asterix.test.aql.TestExecutor;
-import org.apache.asterix.aoya.test.YARNCluster;
-import org.apache.asterix.common.configuration.AsterixConfiguration;
-import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized.Parameters;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AsterixYARNLifecycleIT {
- private static final String PATH_ACTUAL = "ittest/";
private static final Logger LOGGER = Logger.getLogger(AsterixYARNLifecycleIT.class.getName());
private static final String INSTANCE_NAME = "asterix-integration-test";
private static YarnConfiguration appConf;
@@ -141,7 +115,8 @@
@Test
public void test_8_DeleteActiveInstance() throws Exception {
- String command = "-n " + INSTANCE_NAME + " -zip " + aoyaServerPath + " -f" + " -bc " + parameterPath + " destroy";
+ String command = "-n " + INSTANCE_NAME + " -zip " + aoyaServerPath + " -f" + " -bc " + parameterPath
+ + " destroy";
executeAoyaCommand(command);
}
diff --git a/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/YARNCluster.java b/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/YARNCluster.java
index fa95402..b23b919 100644
--- a/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/YARNCluster.java
+++ b/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/YARNCluster.java
@@ -19,40 +19,28 @@
package org.apache.asterix.aoya.test;
-import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
-
/**
* Manages a Mini (local VM) YARN cluster with a configured number of NodeManager(s).
- *
*/
-@SuppressWarnings("deprecation")
public class YARNCluster {
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final int nameNodePort = 31888;
- private static final String DATA_PATH = "data/hdfs";
- private static final String HDFS_PATH = "/asterix";
private static final YARNCluster INSTANCE = new YARNCluster();
private MiniYARNCluster miniCluster;
private int numDataNodes = 2;
private Configuration conf = new YarnConfiguration();
- private FileSystem dfs;
public static YARNCluster getInstance() {
return INSTANCE;
@@ -75,14 +63,13 @@
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, "target/integrationts/data");
cleanupLocal();
- //this constructor is deprecated in hadoop 2x
+ //this constructor is deprecated in hadoop 2x
//dfsCluster = new MiniDFSCluster(nameNodePort, conf, numDataNodes, true, true, StartupOption.REGULAR, null);
miniCluster = new MiniYARNCluster("Asterix_testing", numDataNodes, 1, 1);
miniCluster.init(conf);
- dfs = FileSystem.get(conf);
}
-
- public MiniYARNCluster getCluster(){
+
+ public MiniYARNCluster getCluster() {
return miniCluster;
}