Fix ASTERIXDB-1194,ASTERIXDB-1195,ASTERIXDB-1196,ASTERIXDB-1197.
Change-Id: I7d167b64bf9ec754182b5b2fe44dfc7e5908c686
Reviewed-on: https://asterix-gerrit.ics.uci.edu/325
Reviewed-by: Till Westmann <tillw@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2897a15..990decc 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,11 +19,15 @@
package org.apache.asterix.api.common;
import java.io.File;
+import java.io.IOException;
import java.util.EnumSet;
+import org.apache.asterix.common.config.AsterixPropertiesAccessor;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobFlag;
@@ -36,6 +40,7 @@
public class AsterixHyracksIntegrationUtil {
+ private static final String IO_DIR_KEY = "java.io.tmpdir";
public static final int NODES = 2;
public static final int PARTITONS = 2;
@@ -47,7 +52,16 @@
public static NodeControllerService[] ncs = new NodeControllerService[NODES];
public static IHyracksClientConnection hcc;
- public static void init() throws Exception {
+ protected static AsterixTransactionProperties txnProperties;
+
+ public static void init(boolean deleteOldInstanceData) throws Exception {
+ AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
+ txnProperties = new AsterixTransactionProperties(apa);
+ if (deleteOldInstanceData) {
+ deleteTransactionLogs();
+ removeTestStorageFiles();
+ }
+
CCConfig ccConfig = new CCConfig();
ccConfig.clusterNetIpAddress = "127.0.0.1";
ccConfig.clientNetIpAddress = "127.0.0.1";
@@ -61,6 +75,7 @@
cc = new ClusterControllerService(ccConfig);
cc.start();
+ // Starts ncs.
int n = 0;
for (String ncName : getNcNames()) {
NCConfig ncConfig1 = new NCConfig();
@@ -86,7 +101,6 @@
ncs[n].start();
++n;
}
-
hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
}
@@ -110,14 +124,20 @@
return hcc;
}
- public static void deinit() throws Exception {
+ public static void deinit(boolean deleteOldInstanceData) throws Exception {
for (int n = 0; n < ncs.length; ++n) {
if (ncs[n] != null)
ncs[n].stop();
}
- if (cc != null)
+ if (cc != null) {
cc.stop();
+ }
+
+ if (deleteOldInstanceData) {
+ deleteTransactionLogs();
+ removeTestStorageFiles();
+ }
}
public static void runJob(JobSpecification spec) throws Exception {
@@ -127,6 +147,23 @@
hcc.waitForCompletion(jobId);
}
+ private static void removeTestStorageFiles() throws IOException {
+ File dir = new File(System.getProperty(IO_DIR_KEY));
+ for (String ncName : AsterixHyracksIntegrationUtil.getNcNames()) {
+ File ncDir = new File(dir, ncName);
+ FileUtils.deleteQuietly(ncDir);
+ }
+ }
+
+ private static void deleteTransactionLogs() throws Exception {
+ for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
+ File log = new File(txnProperties.getLogDirectory(ncId));
+ if (log.exists()) {
+ FileUtils.deleteDirectory(log);
+ }
+ }
+ }
+
/**
* main method to run a simple 2 node cluster in-process
* suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code>
@@ -136,9 +173,10 @@
*/
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
public void run() {
try {
- deinit();
+ deinit(false);
} catch (Exception e) {
e.printStackTrace();
}
@@ -147,7 +185,7 @@
try {
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
- init();
+ init(false);
while (true) {
Thread.sleep(10000);
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
index b213bca..c83ce6a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -40,6 +40,7 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.util.FlushDatasetUtils;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -108,7 +109,12 @@
pkStrBuf.delete(pkStrBuf.length() - 1, pkStrBuf.length());
// Constructs the returned json object.
- formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), hcc.getNodeControllerInfos());
+ formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), temp,
+ hcc.getNodeControllerInfos());
+
+ // Flush the cached contents of the dataset to file system.
+ FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseName, datasetName, datasetName);
+
// Metadata transaction commits.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
// Writes file splits.
@@ -123,8 +129,10 @@
}
private void formResponseObject(JSONObject jsonResponse, FileSplit[] fileSplits, ARecordType recordType,
- String primaryKeys, Map<String, NodeControllerInfo> nodeMap) throws Exception {
+ String primaryKeys, boolean temp, Map<String, NodeControllerInfo> nodeMap) throws Exception {
JSONArray partititons = new JSONArray();
+ // Whether the dataset is temp or not
+ jsonResponse.put("temp", temp);
// Adds a primary key.
jsonResponse.put("keys", primaryKeys);
// Adds record type.
@@ -133,7 +141,7 @@
for (FileSplit split : fileSplits) {
String ipAddress = nodeMap.get(split.getNodeName()).getNetworkAddress().getAddress().toString();
String path = split.getLocalFile().getFile().getAbsolutePath();
- FilePartition partition = new FilePartition(ipAddress, path);
+ FilePartition partition = new FilePartition(ipAddress, path, split.getIODeviceId());
partititons.put(partition.toJSONObject());
}
// Generates the response object which contains the splits.
@@ -144,10 +152,12 @@
class FilePartition {
private final String ipAddress;
private final String path;
+ private final int ioDeviceId;
- public FilePartition(String ipAddress, String path) {
+ public FilePartition(String ipAddress, String path, int ioDeviceId) {
this.ipAddress = ipAddress;
this.path = path;
+ this.ioDeviceId = ioDeviceId;
}
public String getIPAddress() {
@@ -158,6 +168,10 @@
return path;
}
+ public int getIODeviceId() {
+ return ioDeviceId;
+ }
+
@Override
public String toString() {
return ipAddress + ":" + path;
@@ -167,6 +181,7 @@
JSONObject partition = new JSONObject();
partition.put("ip", ipAddress);
partition.put("path", path);
+ partition.put("ioDeviceId", ioDeviceId);
return partition;
}
-}
\ No newline at end of file
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
index d75625f..3852020 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
@@ -34,17 +34,20 @@
super(compilationProvider);
}
+ @Override
protected String getQueryParameter(HttpServletRequest request) {
return request.getParameter("statements");
}
+ @Override
protected List<Statement.Kind> getAllowedStatements() {
Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DELETE, Kind.INSERT, Kind.UPDATE, Kind.DML_CMD_LIST,
Kind.LOAD, Kind.CONNECT_FEED, Kind.DISCONNECT_FEED, Kind.SET, Kind.COMPACT,
- Kind.EXTERNAL_DATASET_REFRESH };
+ Kind.EXTERNAL_DATASET_REFRESH, Kind.RUN };
return Arrays.asList(statementsArray);
}
+ @Override
protected String getErrorMessage() {
return "Invalid statement: Non-Update statement %s to the Update API.";
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index fa55a47..51e1612 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -21,9 +21,9 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.PrintWriter;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Date;
@@ -40,10 +40,9 @@
import java.util.logging.Logger;
import org.apache.asterix.api.common.APIFramework;
-import org.apache.asterix.api.common.Job;
import org.apache.asterix.api.common.SessionConfig;
import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.common.config.AsterixCompilerProperties;
+import org.apache.asterix.common.config.AsterixExternalProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -149,13 +148,11 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeSignature;
import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
import org.apache.asterix.result.ResultReader;
import org.apache.asterix.result.ResultUtils;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -169,32 +166,25 @@
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.TypeTranslator;
import org.apache.asterix.translator.util.ValidateUtil;
+import org.apache.asterix.util.FlushDatasetUtils;
+import org.apache.asterix.util.JobUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.json.JSONArray;
import org.json.JSONException;
@@ -641,7 +631,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
//#. runJob
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
//#. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -674,8 +664,7 @@
JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
-
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -954,7 +943,7 @@
"Failed to create job spec for replicating Files Index For external dataset");
}
filesIndexReplicated = true;
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
}
}
@@ -997,7 +986,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
//#. create the index artifact in NC.
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
@@ -1011,7 +1000,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
//#. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1050,7 +1039,7 @@
metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1072,7 +1061,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1270,7 +1259,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1297,7 +1286,7 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e2) {
//do no throw exception since still the metadata needs to be compensated.
@@ -1391,12 +1380,12 @@
//# disconnect the feeds
for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
- runJob(hcc, p.first, true);
+ JobUtils.runJob(hcc, p.first, true);
}
//#. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1434,7 +1423,7 @@
//#. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
if (indexes.size() > 0) {
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
@@ -1463,7 +1452,7 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e2) {
//do no throw exception since still the metadata needs to be compensated.
@@ -1561,7 +1550,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
//#. begin a new transaction
@@ -1624,7 +1613,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
//#. begin a new transaction
@@ -1654,7 +1643,7 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e2) {
//do no throw exception since still the metadata needs to be compensated.
@@ -1810,7 +1799,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (spec != null) {
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
}
} catch (Exception e) {
if (bActiveTxn) {
@@ -1844,7 +1833,7 @@
bActiveTxn = false;
if (compiled != null) {
- runJob(hcc, compiled, true);
+ JobUtils.runJob(hcc, compiled, true);
}
} catch (Exception e) {
@@ -1880,7 +1869,7 @@
bActiveTxn = false;
if (compiled != null) {
- runJob(hcc, compiled, true);
+ JobUtils.runJob(hcc, compiled, true);
}
} catch (Exception e) {
@@ -2152,7 +2141,7 @@
for (IFeedJoint fj : triple.third) {
FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
}
- runJob(hcc, pair.first, false);
+ JobUtils.runJob(hcc, pair.first, false);
IFeedAdapterFactory adapterFactory = pair.second;
if (adapterFactory.isRecordTrackingEnabled()) {
FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
@@ -2323,7 +2312,7 @@
JobSpecification jobSpec = specDisconnectType.first;
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
if (!specDisconnectType.second) {
CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
@@ -2378,7 +2367,7 @@
bActiveTxn = false;
if (compiled != null) {
- runJob(hcc, alteredJobSpec, false);
+ JobUtils.runJob(hcc, alteredJobSpec, false);
}
} catch (Exception e) {
@@ -2429,7 +2418,6 @@
.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
jobsToExecute
.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
-
}
}
} else {
@@ -2458,7 +2446,7 @@
//#. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
+ JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e) {
if (bActiveTxn) {
@@ -2486,7 +2474,7 @@
if (sessionConfig.isExecuteQuery() && compiled != null) {
GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
- JobId jobId = runJob(hcc, compiled, false);
+ JobId jobId = JobUtils.runJob(hcc, compiled, false);
JSONObject response = new JSONObject();
switch (resultDelivery) {
@@ -2664,14 +2652,14 @@
transactionState = ExternalDatasetTransactionState.BEGIN;
//run the files update job
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
for (Index index : indexes) {
if (!ExternalIndexingOperations.isFileIndex(index)) {
spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, deletedFiles,
addedFiles, appendedFiles, metadataProvider);
//run the files update job
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
}
}
@@ -2690,7 +2678,7 @@
bActiveTxn = false;
transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
// We don't release the latch since this job is expected to be quick
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
// Start a new metadata transaction to record the final state of the transaction
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2760,7 +2748,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
try {
- runJob(hcc, spec, true);
+ JobUtils.runJob(hcc, spec, true);
} catch (Exception e2) {
// This should never happen -- fix throw illegal
e.addSuppressed(e2);
@@ -2812,179 +2800,53 @@
}
private void handlePregelixStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws AsterixException, Exception {
-
+ IHyracksClientConnection hcc) throws Exception {
RunStatement pregelixStmt = (RunStatement) stmt;
boolean bActiveTxn = true;
-
String dataverseNameFrom = getActiveDataverse(pregelixStmt.getDataverseNameFrom());
String dataverseNameTo = getActiveDataverse(pregelixStmt.getDataverseNameTo());
String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
- if (dataverseNameFrom != dataverseNameTo) {
- throw new AlgebricksException("Pregelix statements across different dataverses are not supported.");
- }
-
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.pregelixBegin(dataverseNameFrom, datasetNameFrom, datasetNameTo);
-
+ List<String> readDataverses = new ArrayList<String>();
+ readDataverses.add(dataverseNameFrom);
+ List<String> readDatasets = new ArrayList<String>();
+ readDatasets.add(datasetNameFrom);
+ MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseNameTo, datasetNameTo, readDataverses, readDatasets);
try {
+ prepareRunExternalRuntime(metadataProvider, hcc, pregelixStmt, dataverseNameFrom, dataverseNameTo,
+ datasetNameFrom, datasetNameTo, mdTxnCtx);
- // construct input paths
- Index fromIndex = null;
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom,
- pregelixStmt.getDatasetNameFrom().getValue());
- for (Index ind : indexes) {
- if (ind.isPrimaryIndex())
- fromIndex = ind;
+ String pregelixHomeKey = "PREGELIX_HOME";
+ // Finds PREGELIX_HOME in system environment variables.
+ String pregelixHome = System.getenv(pregelixHomeKey);
+ // Finds PREGELIX_HOME in Java properties.
+ if (pregelixHome == null) {
+ pregelixHome = System.getProperty(pregelixHomeKey);
+ }
+ // Finds PREGELIX_HOME in AsterixDB configuration.
+ if (pregelixHome == null) {
+ // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties, pregelixHome can never be null.
+ pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
}
- if (fromIndex == null) {
- throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameFrom);
- }
-
- Dataset datasetFrom = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameFrom, datasetNameFrom);
- IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
- dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName(),
- datasetFrom.getDatasetDetails().isTemp()).first;
- StringBuilder fromSplitsPaths = new StringBuilder();
-
- for (FileSplit f : fromSplits.getFileSplits()) {
- fromSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
- fromSplitsPaths.append(",");
- }
- fromSplitsPaths.setLength(fromSplitsPaths.length() - 1);
-
- // Construct output paths
- Index toIndex = null;
- indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
- pregelixStmt.getDatasetNameTo().getValue());
- for (Index ind : indexes) {
- if (ind.isPrimaryIndex())
- toIndex = ind;
- }
-
- if (toIndex == null) {
- throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
- }
-
- Dataset datasetTo = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
- IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
- dataverseNameTo, datasetNameTo, toIndex.getIndexName(),
- datasetTo.getDatasetDetails().isTemp()).first;
- StringBuilder toSplitsPaths = new StringBuilder();
-
- for (FileSplit f : toSplits.getFileSplits()) {
- toSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
- toSplitsPaths.append(",");
- }
- toSplitsPaths.setLength(toSplitsPaths.length() - 1);
-
- try {
- Dataset toDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
- DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo),
- pregelixStmt.getDatasetNameTo(), true);
- this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
-
- IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null,
- toDataset.getDatasetDetails().isTemp());
- DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
- pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
- new Identifier(toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
- toDataset.getCompactionPolicyProperties(), toDataset.getHints(), toDataset.getDatasetType(),
- idd, false);
- this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
- }
-
- // Flush source dataset
- flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName());
-
- // call Pregelix
- String pregelix_home = System.getenv("PREGELIX_HOME");
- if (pregelix_home == null) {
- throw new AlgebricksException("PREGELIX_HOME is not defined!");
- }
-
- // construct command
- ArrayList<String> cmd = new ArrayList<String>();
- cmd.add("bin/pregelix");
- cmd.add(pregelixStmt.getParameters().get(0)); // jar
- cmd.add(pregelixStmt.getParameters().get(1)); // class
- for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
- cmd.add(s);
- }
- cmd.add("-inputpaths");
- cmd.add(fromSplitsPaths.toString());
- cmd.add("-outputpath");
- cmd.add(toSplitsPaths.toString());
-
- StringBuilder command = new StringBuilder();
- for (String s : cmd) {
- command.append(s);
- command.append(" ");
- }
- LOGGER.info("Running Pregelix Command: " + command.toString());
-
+ // Constructs the pregelix command line.
+ List<String> cmd = constructPregelixCommand(pregelixStmt, dataverseNameFrom, datasetNameFrom,
+ dataverseNameTo, datasetNameTo);
ProcessBuilder pb = new ProcessBuilder(cmd);
- pb.directory(new File(pregelix_home));
+ pb.directory(new File(pregelixHome));
pb.redirectErrorStream(true);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
-
- Process pr = pb.start();
-
- int resultState = 0;
-
- BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
- String line;
- while ((line = in.readLine()) != null) {
- System.out.println(line);
- if (line.contains("job finished")) {
- resultState = 1;
- }
- if (line.contains("Exception") || line.contains("Error")) {
-
- if (line.contains("Connection refused")) {
- throw new AlgebricksException(
- "The connection to your Pregelix cluster was refused. Is it running? Is the port in the query correct?");
- }
-
- if (line.contains("Could not find or load main class")) {
- throw new AlgebricksException(
- "The main class of your Pregelix query was not found. Is the path to your .jar file correct?");
- }
-
- if (line.contains("ClassNotFoundException")) {
- throw new AlgebricksException(
- "The vertex class of your Pregelix query was not found. Does it exist? Is the spelling correct?");
- }
-
- if (line.contains("HyracksException")) {
- throw new AlgebricksException(
- "Something went wrong executing your Pregelix Job (HyracksException). Check the configuration of STORAGE_BUFFERCACHE_PAGESIZE and STORAGE_MEMORYCOMPONENT_PAGESIZE."
- + "It must match the one of Asterix. You can use managix describe -admin to find out the right configuration. "
- + "Check also if your datatypes in Pregelix and Asterix are matching.");
- }
-
- throw new AlgebricksException(
- "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
- + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
- + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
- }
- }
- pr.waitFor();
- in.close();
-
- if (resultState != 1) {
+ // Executes the Pregelix command.
+ int resultState = executeExternalShellProgram(pb);
+ // Checks the return state of the external Pregelix command.
+ if (resultState != 0) {
throw new AlgebricksException(
- "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+ "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restarted. "
+ "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+ "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
}
@@ -2994,60 +2856,156 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+ MetadataLockManager.INSTANCE.insertDeleteEnd(dataverseNameTo, datasetNameTo, readDataverses, readDatasets);
}
}
- private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
- MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
- throws Exception {
- AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
- int frameSize = compilerProperties.getFrameSize();
- JobSpecification spec = new JobSpecification(frameSize);
+ // Prepares to run a program on external runtime.
+ private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
+ String datasetNameTo, MetadataTransactionContext mdTxnCtx)
+ throws AlgebricksException, AsterixException, Exception {
+ // Validates the source/sink dataverses and datasets.
+ Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
+ if (fromDataset == null) {
+ throw new AsterixException("The source dataset " + datasetNameFrom + " in dataverse " + dataverseNameFrom
+ + " could not be found for the Run command");
+ }
+ Dataset toDataset = metadataProvider.findDataset(dataverseNameTo, datasetNameTo);
+ if (toDataset == null) {
+ throw new AsterixException("The sink dataset " + datasetNameTo + " in dataverse " + dataverseNameTo
+ + " could not be found for the Run command");
+ }
- RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
- AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
- new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
+ try {
+ // Find the primary index of the sink dataset.
+ Index toIndex = null;
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
+ pregelixStmt.getDatasetNameTo().getValue());
+ for (Index index : indexes) {
+ if (index.isPrimaryIndex()) {
+ toIndex = index;
+ break;
+ }
+ }
+ if (toIndex == null) {
+ throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
+ }
+ // Cleans up the sink dataset -- Drop and then Create.
+ DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
+ true);
+ this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+ IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null,
+ toDataset.getDatasetDetails().isTemp());
+ DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
+ pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+ new Identifier(toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
+ toDataset.getCompactionPolicyProperties(), toDataset.getHints(), toDataset.getDatasetType(), idd,
+ false);
+ this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
+ }
- org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
- dataset.getDatasetId());
-
- spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
- dataset.getDatasetDetails().isTemp());
- AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
- primaryPartitionConstraint);
-
- JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true);
- spec.setJobletEventListenerFactory(jobEventListenerFactory);
- runJob(hcc, spec, true);
+ // Flushes source dataset.
+ FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom,
+ datasetNameFrom);
}
- private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
- throws Exception {
- JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, sessionConfig.out(), waitForCompletion);
- return jobIds[0];
+ // Executes external shell commands.
+ private int executeExternalShellProgram(ProcessBuilder pb)
+ throws IOException, AlgebricksException, InterruptedException {
+ Process process = pb.start();
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ LOGGER.info(line);
+ if (line.contains("Exception") || line.contains("Error")) {
+ LOGGER.severe(line);
+ if (line.contains("Connection refused")) {
+ throw new AlgebricksException(
+ "The connection to your Pregelix cluster was refused. Is it running? Is the port in the query correct?");
+ }
+ if (line.contains("Could not find or load main class")) {
+ throw new AlgebricksException(
+ "The main class of your Pregelix query was not found. Is the path to your .jar file correct?");
+ }
+ if (line.contains("ClassNotFoundException")) {
+ throw new AlgebricksException(
+ "The vertex class of your Pregelix query was not found. Does it exist? Is the spelling correct?");
+ }
+ }
+ }
+ process.waitFor();
+ }
+ // Gets the exit value of the program.
+ int resultState = process.exitValue();
+ return resultState;
}
- public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, boolean waitForCompletion)
- throws Exception {
- JobId[] startedJobIds = new JobId[jobs.length];
- for (int i = 0; i < jobs.length; i++) {
- JobSpecification spec = jobs[i].getJobSpec();
- spec.setMaxReattempts(0);
- JobId jobId = hcc.startJob(spec);
- startedJobIds[i] = jobId;
- if (waitForCompletion) {
- hcc.waitForCompletion(jobId);
+ // Constructs a Pregelix command line.
+ private List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
+ String fromDatasetName, String toDataverseName, String toDatasetName) {
+ // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
+ AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+ AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
+ String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
+ StringBuilder asterixdbParameterBuilder = new StringBuilder();
+ asterixdbParameterBuilder.append(
+ "pregelix.asterixdb.url=" + "http://" + clientIP + ":" + externalProperties.getAPIServerPort() + ",");
+ asterixdbParameterBuilder.append("pregelix.asterixdb.source=true,");
+ asterixdbParameterBuilder.append("pregelix.asterixdb.sink=true,");
+ asterixdbParameterBuilder.append("pregelix.asterixdb.input.dataverse=" + fromDataverseName + ",");
+ asterixdbParameterBuilder.append("pregelix.asterixdb.input.dataset=" + fromDatasetName + ",");
+ asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataverse=" + toDataverseName + ",");
+ asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" + toDatasetName + ",");
+ asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,");
+
+ // construct command
+ List<String> cmds = new ArrayList<String>();
+ cmds.add("bin/pregelix");
+ cmds.add(pregelixStmt.getParameters().get(0)); // jar
+ cmds.add(pregelixStmt.getParameters().get(1)); // class
+
+ String customizedPregelixProperty = "-cust-prop";
+ String inputConverterClassKey = "pregelix.asterixdb.input.converterclass";
+ String inputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdInputVertexConverter,";
+ String outputConverterClassKey = "pregelix.asterixdb.output.converterclass";
+ String outputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,";
+ boolean custPropAdded = false;
+ boolean meetCustProp = false;
+ // User parameters.
+ for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
+ if (meetCustProp) {
+ if (!s.contains(inputConverterClassKey)) {
+ asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
+ }
+ if (!s.contains(outputConverterClassKey)) {
+ asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
+ }
+ cmds.add(asterixdbParameterBuilder.toString() + s);
+ meetCustProp = false;
+ custPropAdded = true;
+ continue;
+ }
+ cmds.add(s);
+ if (s.equals(customizedPregelixProperty)) {
+ meetCustProp = true;
}
}
- return startedJobIds;
+
+ if (!custPropAdded) {
+ cmds.add(customizedPregelixProperty);
+ // Appends default converter classes to asterixdbParameterBuilder.
+ asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
+ asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
+ // Remove the last comma.
+ asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
+ asterixdbParameterBuilder.length());
+ cmds.add(asterixdbParameterBuilder.toString());
+ }
+ return cmds;
}
private String getActiveDataverseName(String dataverse) throws AlgebricksException {
diff --git a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
index b4fa1d3..823cc4e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
+++ b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
@@ -80,11 +80,11 @@
File lsn = new File("last_checkpoint_lsn");
lsn.deleteOnExit();
- AsterixHyracksIntegrationUtil.init();
+ AsterixHyracksIntegrationUtil.init(false);
}
public static void tearDown() throws Exception {
- AsterixHyracksIntegrationUtil.deinit();
+ AsterixHyracksIntegrationUtil.deinit(false);
}
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index b7c210c..013e021 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -117,7 +117,6 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
metadataProvider.getMetadataTxnContext());
- // The index drop operation should be persistent regardless of temp datasets or permanent dataset
IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
@@ -125,7 +124,7 @@
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, true));
+ filterCmpFactories, btreeFields, filterFields, !temp));
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
@@ -180,7 +179,6 @@
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
- // The index create operation should be persistent regardless of temp datasets or permanent dataset
TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
@@ -189,7 +187,7 @@
.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
- btreeFields, filterFields, true), localResourceFactoryProvider,
+ btreeFields, filterFields, !temp), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index ebac99f..c5870a6 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -113,7 +113,7 @@
dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, true));
+ storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
AlgebricksPartitionConstraintHelper
.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
spec.addRoot(btreeDrop);
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index 34517ca..7c14e5d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -96,13 +96,12 @@
secondaryBTreeFields, secondaryFilterFields);
localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
LocalResource.LSMBTreeResource);
- // The index create operation should be persistent regardless of temp datasets or permanent dataset.
indexDataflowHelperFactory = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
- secondaryBTreeFields, secondaryFilterFields, true);
+ secondaryBTreeFields, secondaryFilterFields, !dataset.getDatasetDetails().isTemp());
} else {
// External dataset local resource and dataflow helper
int[] buddyBreeFields = new int[] { numSecondaryKeys };
@@ -197,7 +196,7 @@
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
} else {
- // Create dummy key provider for feeding the primary index scan.
+ // Create dummy key provider for feeding the primary index scan.
AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
// Create primary index scan op.
diff --git a/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java b/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
new file mode 100644
index 0000000..7536c70
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.util;
+
+import org.apache.asterix.common.config.AsterixCompilerProperties;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
+import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class FlushDatasetUtils {
+
+ public static void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
+ MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
+ throws Exception {
+ AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+ int frameSize = compilerProperties.getFrameSize();
+ JobSpecification spec = new JobSpecification(frameSize);
+
+ RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
+ AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+ new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
+
+ org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+ dataset.getDatasetId());
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
+ dataset.getDatasetDetails().isTemp());
+ AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
+ primaryPartitionConstraint);
+
+ JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+ JobUtils.runJob(hcc, spec, true);
+ }
+
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java b/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
new file mode 100644
index 0000000..fb50b0c
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.util;
+
+import org.apache.asterix.api.common.Job;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class JobUtils {
+
+ public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
+ throws Exception {
+ JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, waitForCompletion);
+ return jobIds[0];
+ }
+
+ public static JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, boolean waitForCompletion)
+ throws Exception {
+ JobId[] startedJobIds = new JobId[jobs.length];
+ for (int i = 0; i < jobs.length; i++) {
+ JobSpecification spec = jobs[i].getJobSpec();
+ spec.setMaxReattempts(0);
+ JobId jobId = hcc.startJob(spec);
+ startedJobIds[i] = jobId;
+ if (waitForCompletion) {
+ hcc.waitForCompletion(jobId);
+ }
+ }
+ return startedJobIds;
+ }
+
+}
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index f8f33d6..8d0b7f3 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -66,6 +66,10 @@
<name>compiler.joinmemory</name>
<value>163840</value>
</property>
+ <property>
+ <name>compiler.pregelix.home</name>
+ <value>~/pregelix</value>
+ </property>
<property>
<name>storage.buffercache.pagesize</name>
<value>32768</value>
diff --git a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
index 6da127c..80f5f5a 100644
--- a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
@@ -103,7 +103,9 @@
new ByteArrayInputStream(outputStream.toByteArray())));
JSONObject actualResponse = new JSONObject(tokener);
- // Checks the data type of the dataset.
+ // Checks the temp-or-not, primary key, data type of the dataset.
+ boolean temp = actualResponse.getBoolean("temp");
+ Assert.assertFalse(temp);
String primaryKey = actualResponse.getString("keys");
Assert.assertEquals("DataverseName,DatasetName", primaryKey);
ARecordType recordType = (ARecordType) JSONDeserializerForTypes.convertFromJSON((JSONObject) actualResponse
@@ -144,20 +146,23 @@
nodeMap.put("nc2", mockInfo2);
PA.invokeMethod(servlet,
"formResponseObject(org.json.JSONObject, org.apache.hyracks.dataflow.std.file.FileSplit[], "
- + "org.apache.asterix.om.types.ARecordType, java.lang.String, java.util.Map)", actualResponse,
- splits, recordType, primaryKey, nodeMap);
+ + "org.apache.asterix.om.types.ARecordType, java.lang.String, boolean, java.util.Map)",
+ actualResponse, splits, recordType, primaryKey, true, nodeMap);
// Constructs expected response.
JSONObject expectedResponse = new JSONObject();
+ expectedResponse.put("temp", true);
expectedResponse.put("keys", primaryKey);
expectedResponse.put("type", recordType.toJSON());
JSONArray splitsArray = new JSONArray();
JSONObject element1 = new JSONObject();
element1.put("ip", "127.0.0.1");
element1.put("path", splits[0].getLocalFile().getFile().getAbsolutePath());
+ element1.put("ioDeviceId", 0);
JSONObject element2 = new JSONObject();
element2.put("ip", "127.0.0.2");
element2.put("path", splits[1].getLocalFile().getFile().getAbsolutePath());
+ element2.put("ioDeviceId", 0);
splitsArray.put(element1);
splitsArray.put(element2);
expectedResponse.put("splits", splitsArray);
diff --git a/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
new file mode 100644
index 0000000..89350aa
--- /dev/null
+++ b/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.aql.translator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.api.common.SessionConfig;
+import org.apache.asterix.common.config.AsterixExternalProperties;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.MasterNode;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.RunStatement;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.junit.Test;
+
+import junit.extensions.PA;
+import junit.framework.Assert;
+
+@SuppressWarnings({ "unchecked", "deprecation" })
+public class QueryTranslatorTest {
+
+ @Test
+ public void test() throws Exception {
+ List<Statement> statements = new ArrayList<Statement>();
+ SessionConfig mockSessionConfig = mock(SessionConfig.class);
+ RunStatement mockRunStatement = mock(RunStatement.class);
+
+ // Mocks AsterixAppContextInfo.
+ AsterixAppContextInfo mockAsterixAppContextInfo = mock(AsterixAppContextInfo.class);
+ setFinalStaticField(AsterixAppContextInfo.class.getDeclaredField("INSTANCE"), mockAsterixAppContextInfo);
+ AsterixExternalProperties mockAsterixExternalProperties = mock(AsterixExternalProperties.class);
+ when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties);
+ when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002);
+
+ // Mocks AsterixClusterProperties.
+ Cluster mockCluster = mock(Cluster.class);
+ MasterNode mockMasterNode = mock(MasterNode.class);
+ AsterixClusterProperties mockClusterProperties = mock(AsterixClusterProperties.class);
+ setFinalStaticField(AsterixClusterProperties.class.getDeclaredField("INSTANCE"), mockClusterProperties);
+ when(mockClusterProperties.getCluster()).thenReturn(mockCluster);
+ when(mockCluster.getMasterNode()).thenReturn(mockMasterNode);
+ when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
+
+ QueryTranslator aqlTranslator = new QueryTranslator(statements, mockSessionConfig,
+ new AqlCompilationProvider());
+ List<String> parameters = new ArrayList<String>();
+ parameters.add("examples/pregelix-example-jar-with-dependencies.jar");
+ parameters.add("org.apache.pregelix.example.PageRankVertex");
+ parameters.add("-ip 10.0.2.15 -port 3199");
+ when(mockRunStatement.getParameters()).thenReturn(parameters);
+ // Test a customer command without "-cust-prop".
+ List<String> cmds = (List<String>) PA.invokeMethod(aqlTranslator,
+ "constructPregelixCommand(org.apache.asterix.lang.common.statement.RunStatement,"
+ + "String,String,String,String)",
+ mockRunStatement, "fromDataverse", "fromDataset", "toDataverse", "toDataset");
+ List<String> expectedCmds = Arrays.asList(new String[] { "bin/pregelix",
+ "examples/pregelix-example-jar-with-dependencies.jar", "org.apache.pregelix.example.PageRankVertex",
+ "-ip", "10.0.2.15", "-port", "3199", "-cust-prop",
+ "pregelix.asterixdb.url=http://127.0.0.1:19002,pregelix.asterixdb.source=true,pregelix.asterixdb.sink=true,pregelix.asterixdb.input.dataverse=fromDataverse,pregelix.asterixdb.input.dataset=fromDataset,pregelix.asterixdb.output.dataverse=toDataverse,pregelix.asterixdb.output.dataset=toDataset,pregelix.asterixdb.output.cleanup=false,pregelix.asterixdb.input.converterclass=org.apache.pregelix.example.converter.VLongIdInputVertexConverter,pregelix.asterixdb.output.converterclass=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter" });
+ Assert.assertEquals(cmds, expectedCmds);
+
+ parameters.remove(parameters.size() - 1);
+ parameters.add("-ip 10.0.2.15 -port 3199 -cust-prop "
+ + "pregelix.asterixdb.input.converterclass=org.apache.pregelix.example.converter.TestInputVertexConverter,"
+ + "pregelix.asterixdb.output.converterclass=org.apache.pregelix.example.converter.TestOutputVertexConverter");
+ // Test a customer command with "-cust-prop".
+ cmds = (List<String>) PA.invokeMethod(aqlTranslator,
+ "constructPregelixCommand(org.apache.asterix.lang.common.statement.RunStatement,"
+ + "String,String,String,String)",
+ mockRunStatement, "fromDataverse", "fromDataset", "toDataverse", "toDataset");
+ expectedCmds = Arrays.asList(new String[] { "bin/pregelix",
+ "examples/pregelix-example-jar-with-dependencies.jar", "org.apache.pregelix.example.PageRankVertex",
+ "-ip", "10.0.2.15", "-port", "3199", "-cust-prop",
+ "pregelix.asterixdb.url=http://127.0.0.1:19002,pregelix.asterixdb.source=true,pregelix.asterixdb.sink=true,pregelix.asterixdb.input.dataverse=fromDataverse,pregelix.asterixdb.input.dataset=fromDataset,pregelix.asterixdb.output.dataverse=toDataverse,pregelix.asterixdb.output.dataset=toDataset,pregelix.asterixdb.output.cleanup=false,pregelix.asterixdb.input.converterclass=org.apache.pregelix.example.converter.TestInputVertexConverter,pregelix.asterixdb.output.converterclass=org.apache.pregelix.example.converter.TestOutputVertexConverter" });
+ Assert.assertEquals(cmds, expectedCmds);
+ }
+
+ private void setFinalStaticField(Field field, Object newValue) throws Exception {
+ field.setAccessible(true);
+ // remove final modifier from field
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ field.set(null, newValue);
+ }
+}
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java b/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
index dff1c18..0a22f1c 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
@@ -54,7 +54,7 @@
}
outdir.mkdirs();
- AsterixHyracksIntegrationUtil.init();
+ AsterixHyracksIntegrationUtil.init(true);
Reader loadReader = new BufferedReader(
new InputStreamReader(new FileInputStream(LOAD_FOR_ENLIST_FILE), "UTF-8"));
AsterixJavaClient asterixLoad = new AsterixJavaClient(
@@ -69,7 +69,7 @@
}
asterixLoad.execute();
- AsterixHyracksIntegrationUtil.deinit();
+ AsterixHyracksIntegrationUtil.deinit(true);
for (String d : ASTERIX_DATA_DIRS) {
testExecutor.deleteRec(new File(d));
}
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 3788606..376b2ff 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -23,12 +23,9 @@
import java.util.Collection;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.test.aql.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -50,7 +47,6 @@
.join(new String[] { "src", "test", "resources", "metadata" + File.separator }, File.separator);
private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
- private static AsterixTransactionProperties txnProperties;
private static final TestExecutor testExecutor = new TestExecutor();
@BeforeClass
@@ -58,18 +54,12 @@
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
-
- AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
- txnProperties = new AsterixTransactionProperties(apa);
-
- deleteTransactionLogs();
-
- AsterixHyracksIntegrationUtil.init();
+ AsterixHyracksIntegrationUtil.init(true);
}
@AfterClass
public static void tearDown() throws Exception {
- AsterixHyracksIntegrationUtil.deinit();
+ AsterixHyracksIntegrationUtil.deinit(true);
File outdir = new File(PATH_ACTUAL);
File[] files = outdir.listFiles();
if (files == null || files.length == 0) {
@@ -82,15 +72,6 @@
}
}
- private static void deleteTransactionLogs() throws Exception {
- for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
- File log = new File(txnProperties.getLogDirectory(ncId));
- if (log.exists()) {
- FileUtils.deleteDirectory(log);
- }
- }
- }
-
@Parameters
public static Collection<Object[]> tests() throws Exception {
Collection<Object[]> testArgs = new ArrayList<Object[]>();
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index 092f898..2f8a910 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -30,8 +30,6 @@
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.api.java.AsterixJavaClient;
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.compiler.provider.AqlCompilationProvider;
@@ -40,7 +38,6 @@
import org.apache.asterix.external.util.IdentitiyResolverFactory;
import org.apache.asterix.test.base.AsterixTestHelper;
import org.apache.asterix.test.common.TestHelper;
-import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
@@ -71,8 +68,6 @@
private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
- private static AsterixTransactionProperties txnProperties;
-
@BeforeClass
public static void setUp() throws Exception {
// File outdir = new File(PATH_ACTUAL);
@@ -82,27 +77,13 @@
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
- AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
- txnProperties = new AsterixTransactionProperties(apa);
-
- deleteTransactionLogs();
-
- AsterixHyracksIntegrationUtil.init();
+ AsterixHyracksIntegrationUtil.init(true);
// Set the node resolver to be the identity resolver that expects node names
// to be node controller ids; a valid assumption in test environment.
System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
IdentitiyResolverFactory.class.getName());
}
- private static void deleteTransactionLogs() throws Exception {
- for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
- File log = new File(txnProperties.getLogDirectory(ncId));
- if (log.exists()) {
- FileUtils.deleteDirectory(log);
- }
- }
- }
-
@AfterClass
public static void tearDown() throws Exception {
// _bootstrap.stop();
@@ -111,7 +92,7 @@
if (files == null || files.length == 0) {
outdir.delete();
}
- AsterixHyracksIntegrationUtil.deinit();
+ AsterixHyracksIntegrationUtil.deinit(true);
}
private static void suiteBuild(File dir, Collection<Object[]> testArgs, String path) {
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 0473f6c..52bad6f 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -25,14 +25,11 @@
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
import org.apache.asterix.external.util.IdentitiyResolverFactory;
import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.asterix.testframework.xml.TestSuite;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
@@ -44,8 +41,6 @@
protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
- protected static AsterixTransactionProperties txnProperties;
-
protected static TestGroup FailedGroup;
public static void setUp() throws Exception {
@@ -55,15 +50,10 @@
}
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
- AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
- txnProperties = new AsterixTransactionProperties(apa);
-
- deleteTransactionLogs();
-
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("initializing pseudo cluster");
}
- AsterixHyracksIntegrationUtil.init();
+ AsterixHyracksIntegrationUtil.init(true);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("initializing HDFS");
@@ -81,19 +71,11 @@
FailedGroup.setName("failed");
}
- private static void deleteTransactionLogs() throws Exception {
- for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
- File log = new File(txnProperties.getLogDirectory(ncId));
- if (log.exists()) {
- FileUtils.deleteDirectory(log);
- }
- }
- }
-
private static void validateBufferCacheState() {
- for(NodeControllerService nc: AsterixHyracksIntegrationUtil.ncs){
- IAsterixAppRuntimeContext appCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext().getApplicationObject();
- if(!((BufferCache)appCtx.getBufferCache()).isClean()){
+ for (NodeControllerService nc : AsterixHyracksIntegrationUtil.ncs) {
+ IAsterixAppRuntimeContext appCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
+ .getApplicationObject();
+ if (!((BufferCache) appCtx.getBufferCache()).isClean()) {
throw new IllegalStateException();
}
}
@@ -101,7 +83,7 @@
public static void tearDown() throws Exception {
validateBufferCacheState();
- AsterixHyracksIntegrationUtil.deinit();
+ AsterixHyracksIntegrationUtil.deinit(true);
File outdir = new File(PATH_ACTUAL);
File[] files = outdir.listFiles();
if (files == null || files.length == 0) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixCompilerProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixCompilerProperties.java
index 46a4a4f..42f612a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixCompilerProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixCompilerProperties.java
@@ -31,6 +31,9 @@
private static final String COMPILER_FRAMESIZE_KEY = "compiler.framesize";
private static int COMPILER_FRAMESIZE_DEFAULT = (32 << 10); // 32KB
+ private static final String COMPILER_PREGELIX_HOME = "compiler.pregelix.home";
+ private static final String COMPILER_PREGELIX_HOME_DEFAULT = "~/pregelix";
+
public AsterixCompilerProperties(AsterixPropertiesAccessor accessor) {
super(accessor);
}
@@ -55,4 +58,8 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public String getPregelixHome() {
+ return accessor.getProperty(COMPILER_PREGELIX_HOME, COMPILER_PREGELIX_HOME_DEFAULT,
+ PropertyInterpreters.getStringPropertyInterpreter());
+ }
}
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index ab64cae..622ed3a 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -219,6 +219,11 @@
</description>
</property>
+ <property>
+ <name>compiler.pregelix.home</name>
+ <value>~/pregelix</value>
+ </property>
+
<property>
<name>web.port</name>
<value>19001</value>
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index 867ce75..5d345b9 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -225,7 +225,7 @@
public void releaseFeedWriteLock(String feedName) {
feedsLocks.get(feedName).writeLock().unlock();
}
-
+
public void acquireFeedPolicyWriteLock(String policyName) {
ReentrantReadWriteLock fLock = feedPolicyLocks.get(policyName);
if (fLock == null) {
@@ -444,7 +444,7 @@
releaseFeedWriteLock(feedFullyQualifiedName);
releaseDataverseReadLock(dataverseName);
}
-
+
public void dropFeedPolicyBegin(String dataverseName, String policyName) {
releaseFeedWriteLock(policyName);
releaseDataverseReadLock(dataverseName);
@@ -486,7 +486,7 @@
releaseFeedPolicyWriteLock(policyName);
releaseDataverseReadLock(dataverseName);
}
-
+
public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
String feedFullyQualifiedName) {
acquireDataverseReadLock(dataverseName);
@@ -499,14 +499,13 @@
releaseDatasetReadLock(datasetFullyQualifiedName);
releaseDataverseReadLock(dataverseName);
}
-
- public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName,
- String feedFullyQualifiedName) {
+
+ public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
acquireDataverseReadLock(dataverseName);
acquireDatasetReadLock(datasetFullyQualifiedName);
acquireFeedReadLock(feedFullyQualifiedName);
}
-
+
public void subscribeFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
releaseFeedReadLock(feedFullyQualifiedName);
releaseDatasetReadLock(datasetFullyQualifiedName);
@@ -575,25 +574,4 @@
releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
releaseDataverseReadLock(dataverseName);
}
-
- public void pregelixBegin(String dataverseName, String datasetFullyQualifiedNameFrom,
- String datasetFullyQualifiedNameTo) {
- acquireDataverseReadLock(dataverseName);
-
- if (datasetFullyQualifiedNameFrom.compareTo(datasetFullyQualifiedNameTo) < 0) {
- acquireDatasetReadLock(datasetFullyQualifiedNameFrom);
- acquireDatasetWriteLock(datasetFullyQualifiedNameTo);
- } else {
- acquireDatasetWriteLock(datasetFullyQualifiedNameTo);
- acquireDatasetReadLock(datasetFullyQualifiedNameFrom);
- }
- }
-
- public void pregelixEnd(String dataverseName, String datasetFullyQualifiedNameFrom,
- String datasetFullyQualifiedNameTo) {
-
- releaseDatasetReadLock(datasetFullyQualifiedNameFrom);
- releaseDatasetWriteLock(datasetFullyQualifiedNameTo);
- releaseDataverseReadLock(dataverseName);
- }
}
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index e65f545..42ea000 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -94,6 +94,7 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
+ sb.append(typeName + ": ");
if (isOpen) {
sb.append("open ");
} else {
@@ -290,6 +291,8 @@
@Override
public JSONObject toJSON() throws JSONException {
JSONObject type = new JSONObject();
+ type.put("type", ARecordType.class.getName());
+ type.put("name", typeName);
if (isOpen) {
type.put("open", true);
} else {
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/JSONDeserializerForTypes.java b/asterix-om/src/main/java/org/apache/asterix/om/util/JSONDeserializerForTypes.java
index 801bfb9..b646ea1 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/JSONDeserializerForTypes.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/JSONDeserializerForTypes.java
@@ -44,23 +44,21 @@
* @throws Exception
*/
public static IAType convertFromJSON(JSONObject typeInJSON) throws Exception {
- boolean typeNameExists = typeInJSON.has("type");
- String typeName = typeNameExists ? typeInJSON.getString("type") : null;
-
+ String typeName = typeInJSON.getString("type");
// Deals with ordered list.
- if (typeNameExists && typeName.equals(AOrderedListType.class.getName())) {
+ if (typeName.equals(AOrderedListType.class.getName())) {
IAType itemType = convertFromJSON((JSONObject) typeInJSON.get("item-type"));
return new AOrderedListType(itemType, "ordered-list");
}
// Deals with unordered list.
- if (typeNameExists && typeName.equals(AUnorderedListType.class.getName())) {
+ if (typeName.equals(AUnorderedListType.class.getName())) {
IAType itemType = convertFromJSON((JSONObject) typeInJSON.get("item-type"));
return new AUnorderedListType(itemType, "unordered-list");
}
// Deals with Union Type.
- if (typeNameExists && typeName.equals(AUnionType.class.getName())) {
+ if (typeName.equals(AUnionType.class.getName())) {
List<IAType> unionTypes = new ArrayList<IAType>();
JSONArray fields = (JSONArray) typeInJSON.get("fields");
for (int i = 0; i < fields.length(); i++) {
@@ -70,26 +68,26 @@
return new AUnionType(unionTypes, "union");
}
- // Deals with primitive types.
- if (typeNameExists) {
- Class<?> cl = BuiltinType.class;
- Field typeField = cl.getDeclaredField(typeName.toUpperCase());
- return (IAType) typeField.get(null);
- }
-
// Deals with record types.
- boolean openType = typeInJSON.getBoolean("open");
- JSONArray fields = typeInJSON.getJSONArray("fields");
- String[] fieldNames = new String[fields.length()];
- IAType[] fieldTypes = new IAType[fields.length()];
- for (int i = 0; i < fields.length(); ++i) {
- JSONObject field = (JSONObject) fields.get(i);
- JSONArray names = field.names();
- String fieldName = names.getString(0);
- fieldNames[i] = fieldName;
- fieldTypes[i] = convertFromJSON((JSONObject) field.get(fieldName));
+ if (typeName.equals(ARecordType.class.getName())) {
+ String name = typeInJSON.getString("name");
+ boolean openType = typeInJSON.getBoolean("open");
+ JSONArray fields = typeInJSON.getJSONArray("fields");
+ String[] fieldNames = new String[fields.length()];
+ IAType[] fieldTypes = new IAType[fields.length()];
+ for (int i = 0; i < fields.length(); ++i) {
+ JSONObject field = (JSONObject) fields.get(i);
+ JSONArray names = field.names();
+ String fieldName = names.getString(0);
+ fieldNames[i] = fieldName;
+ fieldTypes[i] = convertFromJSON((JSONObject) field.get(fieldName));
+ }
+ return new ARecordType(name, fieldNames, fieldTypes, openType);
}
- return new ARecordType("record", fieldNames, fieldTypes, openType);
- }
+ // Deals with primitive types.
+ Class<?> cl = BuiltinType.class;
+ Field typeField = cl.getDeclaredField(typeName.toUpperCase());
+ return (IAType) typeField.get(null);
+ }
}
diff --git a/asterix-tools/pom.xml b/asterix-tools/pom.xml
index cbb322d..50eee60 100644
--- a/asterix-tools/pom.xml
+++ b/asterix-tools/pom.xml
@@ -224,13 +224,13 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
- <version>4.2.2</version>
+ <version>4.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
- <version>4.2.2</version>
+ <version>4.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
diff --git a/asterix-yarn/src/main/resources/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
index ab64cae..622ed3a 100644
--- a/asterix-yarn/src/main/resources/base-asterix-configuration.xml
+++ b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
@@ -219,6 +219,11 @@
</description>
</property>
+ <property>
+ <name>compiler.pregelix.home</name>
+ <value>~/pregelix</value>
+ </property>
+
<property>
<name>web.port</name>
<value>19001</value>
diff --git a/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
index ab64cae..622ed3a 100644
--- a/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
+++ b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
@@ -219,6 +219,11 @@
</description>
</property>
+ <property>
+ <name>compiler.pregelix.home</name>
+ <value>~/pregelix</value>
+ </property>
+
<property>
<name>web.port</name>
<value>19001</value>