ASTERIXDB-1302 ASTERIXDB-1301 Fix Socket Feed Connection
A bug causes a read lock to never be released when a feed is
connected with "wait-for-completion" set to false. The bug
was fixed and a test case was added.
Another bug was causing the socket feed to not receive
connections correctly. The bug was fixed and a test case
was added.
Additionally, this change ensures that adapters have absolute
partitions to ensure consistency with regards to feed log
manager.
Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614
Reviewed-on: https://asterix-gerrit.ics.uci.edu/660
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ildar Absalyamov <ildar.absalyamov@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..be9452b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,8 @@
if (tempPath.endsWith(File.separator)) {
tempPath = tempPath.substring(0, tempPath.length() - 1);
}
- //get initial partitions from properties
+ System.err.println("Using the path: " + tempPath);
+ // get initial partitions from properties
String[] nodeStores = propertiesAccessor.getStores().get(ncName);
if (nodeStores == null) {
throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +98,7 @@
tempDirPath += File.separator;
}
for (int p = 0; p < nodeStores.length; p++) {
- //create IO devices based on stores
+ // create IO devices based on stores
String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
File ioDeviceDir = new File(iodevicePath);
ioDeviceDir.mkdirs();
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index c0245d7..5cd490a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.feed.api.IFeedJoint;
import org.apache.asterix.external.feed.api.IFeedMessage;
@@ -37,9 +38,11 @@
import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.file.JobSpecificationUtils;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -49,6 +52,9 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
/**
@@ -251,4 +257,17 @@
completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
}
+
+ public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations();
+ FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
+ locations);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil
+ .splitProviderAndPartitionConstraints(feedLogFileSplits);
+ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
+ spec.addRoot(frod);
+ return spec;
+ }
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 9f024e9..ea50221 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -209,7 +209,7 @@
ASYNC_DEFERRED
}
- public static final boolean IS_DEBUG_MODE = false;//true
+ public static final boolean IS_DEBUG_MODE = false;// true
private final List<Statement> statements;
private final SessionConfig sessionConfig;
private Dataverse activeDefaultDataverse;
@@ -593,8 +593,9 @@
}
if (compactionPolicy == null) {
if (filterField != null) {
- // If the dataset has a filter and the user didn't specify a merge policy, then we will pick the
- // correlated-prefix as the default merge policy.
+ //If the dataset has a filter and the user didn't specify a merge
+ //policy, then we will pick the
+ //correlated-prefix as the default merge policy.
compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
}
@@ -659,10 +660,10 @@
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
//#. execute compensation operations
- // remove the index in NC
- // [Notice]
- // As long as we updated(and committed) metadata, we should remove any effect of the job
- // because an exception occurs during runJob.
+ //remove the index in NC
+ //[Notice]
+ //As long as we updated(and committed) metadata, we should remove any effect of the job
+ //because an exception occurs during runJob.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -679,7 +680,7 @@
}
}
- // remove the record from the metadata.
+ //remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
@@ -803,7 +804,7 @@
String indexName = null;
JobSpecification spec = null;
Dataset ds = null;
- // For external datasets
+ //For external datasets
ArrayList<ExternalFile> externalFilesSnapshot = null;
boolean firstExternalDatasetIndex = false;
boolean filesIndexReplicated = false;
@@ -880,8 +881,10 @@
}
}
- // Checks whether a user is trying to create an inverted secondary index on a dataset with a variable-length primary key.
- // Currently, we do not support this. Therefore, as a temporary solution, we print an error message and stop.
+ //Checks whether a user is trying to create an inverted secondary index on a dataset
+ //with a variable-length primary key.
+ //Currently, we do not support this. Therefore, as a temporary solution, we print an
+ //error message and stop.
if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
|| stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
|| stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -891,7 +894,7 @@
IAType keyType = aRecordType.getSubFieldType(partitioningKey);
ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- // If it is not a fixed length
+ //If it is not a fixed length
if (typeTrait.getFixedLength() < 0) {
throw new AlgebricksException("The keyword or ngram index -" + indexName
+ " cannot be created on the dataset -" + datasetName
@@ -904,27 +907,27 @@
if (ds.getDatasetType() == DatasetType.INTERNAL) {
validateIfResourceIsActiveInFeed(dataverseName, datasetName);
} else {
- // External dataset
- // Check if the dataset is indexible
+ //External dataset
+ //Check if the dataset is indexible
if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
throw new AlgebricksException(
"dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
+ " Adapter can't be indexed");
}
- // check if the name of the index is valid
+ //Check if the name of the index is valid
if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
throw new AlgebricksException("external dataset index name is invalid");
}
- // Check if the files index exist
+ //Check if the files index exist
filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
firstExternalDatasetIndex = (filesIndex == null);
- // lock external dataset
+ //Lock external dataset
ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
datasetLocked = true;
if (firstExternalDatasetIndex) {
- // verify that no one has created an index before we acquire the lock
+ //Verify that no one has created an index before we acquire the lock
filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
if (filesIndex != null) {
@@ -934,20 +937,20 @@
}
}
if (firstExternalDatasetIndex) {
- // Get snapshot from External File System
+ //Get snapshot from External File System
externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
- // Add an entry for the files index
+ //Add an entry for the files index
filesIndex = new Index(dataverseName, datasetName,
ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES,
ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
IMetadataEntity.PENDING_ADD_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
- // Add files to the external files index
+ //Add files to the external files index
for (ExternalFile file : externalFilesSnapshot) {
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
}
- // This is the first index for the external dataset, replicate the files index
+ //This is the first index for the external dataset, replicate the files index
spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
metadataProvider, true);
if (spec == null) {
@@ -1025,13 +1028,14 @@
indexName);
index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
- // add another new files index with PendingNoOp after deleting the index with PendingAddOp
+ //add another new files index with PendingNoOp after deleting the index with
+ //PendingAddOp
if (firstExternalDatasetIndex) {
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
filesIndex.getIndexName());
filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
- // update transaction timestamp
+ //update transaction timestamp
((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
}
@@ -1041,7 +1045,7 @@
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
- // If files index was replicated for external dataset, it should be cleaned up on NC side
+ //If files index was replicated for external dataset, it should be cleaned up on NC side
if (filesIndexReplicated) {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
@@ -1063,7 +1067,7 @@
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
//#. execute compensation operations
- // remove the index in NC
+ //remove the index in NC
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1086,7 +1090,7 @@
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
- // Drop External Files from metadata
+ //Drop External Files from metadata
MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
@@ -1098,7 +1102,7 @@
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
- // Drop the files index from metadata
+ //Drop the files index from metadata
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1110,7 +1114,7 @@
+ ") couldn't be removed from the metadata", e);
}
}
- // remove the record from the metadata.
+ //remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
@@ -1183,7 +1187,6 @@
MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
-
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
if (stmtDelete.getIfExists()) {
@@ -1216,6 +1219,9 @@
+ connection.getDatasetName() + ". Encountered exception " + exception);
}
}
+ //prepare job to remove feed log storage
+ jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+ MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedId.getFeedName())));
}
}
@@ -1239,7 +1245,7 @@
CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
} else {
- // External dataset
+ //External dataset
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
datasetName);
for (int k = 0; k < indexes.size(); k++) {
@@ -1260,8 +1266,9 @@
}
jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
//#. mark PendingDropOp on the dataverse record by
- // first, deleting the dataverse record from the DATAVERSE_DATASET
- // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+ //first, deleting the dataverse record from the DATAVERSE_DATASET
+ //second, inserting the dataverse record with the PendingDropOp value into the
+ //DATAVERSE_DATASET
MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
@@ -1295,7 +1302,7 @@
}
//#. execute compensation operations
- // remove the all indexes in NC
+ //remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
@@ -1305,7 +1312,7 @@
e.addSuppressed(e2);
}
- // remove the record from the metadata.
+ //remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
try {
MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
@@ -1352,7 +1359,7 @@
Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
if (ds.getDatasetType() == DatasetType.INTERNAL) {
- // prepare job spec(s) that would disconnect any active feeds involving the dataset.
+ //prepare job spec(s) that would disconnect any active feeds involving the dataset.
List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
if (feedConnections != null && !feedConnections.isEmpty()) {
for (FeedConnectionId connection : feedConnections) {
@@ -1363,6 +1370,10 @@
LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
+ datasetName + " as dataset is being dropped");
}
+ //prepare job to remove feed log storage
+ jobsToExecute
+ .add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(mdTxnCtx,
+ connection.getFeedId().getDataverse(), connection.getFeedId().getFeedName())));
}
}
@@ -1404,7 +1415,7 @@
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
} else {
- // External dataset
+ //External dataset
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
//#. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
@@ -1447,7 +1458,7 @@
//#. finally, delete the dataset.
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
- // Drop the associated nodegroup
+ //Drop the associated nodegroup
String nodegroup = ds.getNodeGroupName();
if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
@@ -1461,7 +1472,7 @@
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
//#. execute compensation operations
- // remove the all indexes in NC
+ //remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
@@ -1471,7 +1482,7 @@
e.addSuppressed(e2);
}
- // remove the record from the metadata.
+ //remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
@@ -1506,7 +1517,7 @@
MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
String indexName = null;
- // For external index
+ //For external index
boolean dropFilesIndex = false;
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
@@ -1573,7 +1584,7 @@
//#. finally, delete the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
} else {
- // External dataset
+ //External dataset
indexName = stmtIndexDrop.getIndexName().getValue();
Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
if (index == null) {
@@ -1593,7 +1604,7 @@
datasetName);
if (datasetIndexes.size() == 2) {
dropFilesIndex = true;
- // only one index + the files index, we need to delete both of the indexes
+ //only one index + the files index, we need to delete both of the indexes
for (Index externalIndex : datasetIndexes) {
if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
cds = new CompiledIndexDropStatement(dataverseName, datasetName,
@@ -1636,7 +1647,7 @@
//#. finally, delete the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
if (dropFilesIndex) {
- // delete the files index too
+ //delete the files index too
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
ExternalIndexingOperations.getFilesIndexName(datasetName));
MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
@@ -1652,7 +1663,7 @@
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
//#. execute compensation operations
- // remove the all indexes in NC
+ //remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
@@ -1662,7 +1673,7 @@
e.addSuppressed(e2);
}
- // remove the record from the metadata.
+ //remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
@@ -1916,11 +1927,11 @@
ICompiledDmlStatement stmt)
throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
- // Query Rewriting (happens under the same ongoing metadata transaction)
+ //Query Rewriting (happens under the same ongoing metadata transaction)
Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
sessionConfig);
- // Query Compilation (happens under the same ongoing metadata transaction)
+ //Query Compilation (happens under the same ongoing metadata transaction)
JobSpecification spec = apiFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first,
reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt);
@@ -1930,14 +1941,12 @@
private void handleCreateFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
-
CreateFeedStatement cfs = (CreateFeedStatement) stmt;
String dataverseName = getActiveDataverse(cfs.getDataverseName());
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
-
Feed feed = null;
try {
feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -2065,6 +2074,9 @@
throw new AlgebricksException("Feed " + feedId
+ " is currently active and connected to the following dataset(s) \n" + builder.toString());
} else {
+ JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(
+ MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName()));
+ JobUtils.runJob(hcc, spec, true);
MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
}
@@ -2120,7 +2132,6 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- boolean readLatchAcquired = true;
boolean subscriberRegistered = false;
IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
FeedConnectionId feedConnId = null;
@@ -2149,7 +2160,7 @@
FeedPolicyEntity feedPolicy = FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(),
mdTxnCtx);
- // All Metadata checks have passed. Feed connect request is valid. //
+ //All Metadata checks have passed. Feed connect request is valid. //
FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName,
@@ -2165,19 +2176,20 @@
feedId.getFeedName());
Pair<JobSpecification, IAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
metadataProvider, policyAccessor);
- // adapter configuration are valid at this stage
- // register the feed joints (these are auto-de-registered)
+ //adapter configuration are valid at this stage
+ //register the feed joints (these are auto-de-registered)
for (IFeedJoint fj : triple.third) {
FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
}
JobUtils.runJob(hcc, pair.first, false);
- /* TODO: Fix record tracking
+ /*
+ * TODO: Fix record tracking
* IFeedAdapterFactory adapterFactory = pair.second;
- if (adapterFactory.isRecordTrackingEnabled()) {
- FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
- adapterFactory.createIntakeProgressTracker());
- }
- */
+ * if (adapterFactory.isRecordTrackingEnabled()) {
+ * FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
+ * adapterFactory.createIntakeProgressTracker());
+ * }
+ */
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
} else {
for (IFeedJoint fj : triple.third) {
@@ -2186,29 +2198,18 @@
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- readLatchAcquired = false;
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
}
- String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
- boolean waitForCompletion = waitForCompletionParam == null ? false
- : Boolean.valueOf(waitForCompletionParam);
- if (waitForCompletion) {
- MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
- dataverseName + "." + feedName);
- readLatchAcquired = false;
- }
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
throw e;
} finally {
- if (readLatchAcquired) {
- MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
- dataverseName + "." + feedName);
- }
+ MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+ dataverseName + "." + feedName);
if (subscriberRegistered) {
FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId, eventSubscriber);
}
@@ -2242,7 +2243,7 @@
boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
if (!isFeedJointAvailable) {
sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
- if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
+ if (sourceFeedJoint == null) { //the feed is currently not being ingested, i.e., it is unavailable.
connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
@@ -2262,8 +2263,8 @@
functionsToApply.add(f);
}
}
- // register the compute feed point that represents the final output from the collection of
- // functions that will be applied.
+ //register the compute feed point that represents the final output from the collection of
+ //functions that will be applied.
if (!functionsToApply.isEmpty()) {
FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
@@ -2435,7 +2436,7 @@
Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
ds.getItemTypeDataverseName(), itemTypeName);
- // Prepare jobs to compact the datatset and its indexes
+ //Prepare jobs to compact the datatset and its indexes
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
if (indexes.size() == 0) {
throw new AlgebricksException(
@@ -2523,9 +2524,9 @@
ResultReader resultReader = new ResultReader(hcc, hdc);
resultReader.open(jobId, metadataProvider.getResultSetId());
- // In this case (the normal case), we don't use the
- // "response" JSONObject - just stream the results
- // to the "out" PrintWriter
+ //In this case (the normal case), we don't use the
+ //"response" JSONObject - just stream the results
+ //to the "out" PrintWriter
if (sessionConfig.fmt() == OutputFormat.CSV
&& sessionConfig.is(SessionConfig.FORMAT_CSV_HEADER)) {
ResultUtils.displayCSVHeader(metadataProvider.findOutputRecordType(), sessionConfig);
@@ -2554,7 +2555,7 @@
throw e;
} finally {
MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
- // release external datasets' locks acquired during compilation of the query
+ //release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -2615,55 +2616,56 @@
ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
- // Dataset exists ?
+ //Dataset exists ?
if (ds == null) {
throw new AlgebricksException(
"There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
}
- // Dataset external ?
+ //Dataset external ?
if (ds.getDatasetType() != DatasetType.EXTERNAL) {
throw new AlgebricksException(
"dataset " + datasetName + " in dataverse " + dataverseName + " is not an external dataset");
}
- // Dataset has indexes ?
+ //Dataset has indexes ?
indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
if (indexes.size() == 0) {
throw new AlgebricksException("External dataset " + datasetName + " in dataverse " + dataverseName
+ " doesn't have any index");
}
- // Record transaction time
+ //Record transaction time
Date txnTime = new Date();
- // refresh lock here
+ //refresh lock here
ExternalDatasetsRegistry.INSTANCE.refreshBegin(ds);
lockAquired = true;
- // Get internal files
+ //Get internal files
metadataFiles = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, ds);
deletedFiles = new ArrayList<ExternalFile>();
addedFiles = new ArrayList<ExternalFile>();
appendedFiles = new ArrayList<ExternalFile>();
- // Compute delta
- // Now we compare snapshot with external file system
+ //Compute delta
+ //Now we compare snapshot with external file system
if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles,
appendedFiles)) {
((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- // latch will be released in the finally clause
+ //latch will be released in the finally clause
return;
}
- // At this point, we know data has changed in the external file system, record transaction in metadata and start
+ //At this point, we know data has changed in the external file system, record
+ //transaction in metadata and start
transactionDataset = ExternalIndexingOperations.createTransactionDataset(ds);
/*
* Remove old dataset record and replace it with a new one
*/
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
- // Add delta files to the metadata
+ //Add delta files to the metadata
for (ExternalFile file : addedFiles) {
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
}
@@ -2674,7 +2676,7 @@
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
}
- // Create the files index update job
+ //Create the files index update job
spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, deletedFiles, addedFiles,
appendedFiles, metadataProvider);
@@ -2694,10 +2696,10 @@
}
}
- // all index updates has completed successfully, record transaction state
+ //all index updates has completed successfully, record transaction state
spec = ExternalIndexingOperations.buildCommitJob(ds, indexes, metadataProvider);
- // Aquire write latch again -> start a transaction and record the decision to commit
+ //Aquire write latch again -> start a transaction and record the decision to commit
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
bActiveTxn = true;
@@ -2708,9 +2710,9 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
- // We don't release the latch since this job is expected to be quick
+ //We don't release the latch since this job is expected to be quick
JobUtils.runJob(hcc, spec, true);
- // Start a new metadata transaction to record the final state of the transaction
+ //Start a new metadata transaction to record the final state of the transaction
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
bActiveTxn = true;
@@ -2723,11 +2725,11 @@
while (iterator.hasNext()) {
ExternalFile appendedFile = iterator.next();
if (file.getFileName().equals(appendedFile.getFileName())) {
- // delete existing file
+ //delete existing file
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
- // delete existing appended file
+ //delete existing appended file
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile);
- // add the original file with appended information
+ //add the original file with appended information
appendedFile.setFileNumber(file.getFileNumber());
appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, appendedFile);
@@ -2737,24 +2739,24 @@
}
}
- // remove the deleted files delta
+ //remove the deleted files delta
for (ExternalFile file : deletedFiles) {
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
}
- // insert new files
+ //insert new files
for (ExternalFile file : addedFiles) {
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
}
- // mark the transaction as complete
+ //mark the transaction as complete
((ExternalDatasetDetails) transactionDataset.getDatasetDetails())
.setState(ExternalDatasetTransactionState.COMMIT);
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
- // commit metadata transaction
+ //commit metadata transaction
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
success = true;
} catch (Exception e) {
@@ -2766,12 +2768,12 @@
+ datasetName + ") refresh couldn't carry out the commit phase", e);
}
if (transactionState == ExternalDatasetTransactionState.COMMIT) {
- // Nothing to do , everything should be clean
+ //Nothing to do , everything should be clean
throw e;
}
if (transactionState == ExternalDatasetTransactionState.BEGIN) {
- // transaction failed, need to do the following
- // clean NCs removing transaction components
+ //transaction failed, need to do the following
+ //clean NCs removing transaction components
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2781,12 +2783,12 @@
try {
JobUtils.runJob(hcc, spec, true);
} catch (Exception e2) {
- // This should never happen -- fix throw illegal
+ //This should never happen -- fix throw illegal
e.addSuppressed(e2);
throw new IllegalStateException("System is in inconsistent state. Failed to abort refresh", e);
}
- // remove the delta of files
- // return the state of the dataset to committed
+ //remove the delta of files
+ //return the state of the dataset to committed
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
for (ExternalFile file : deletedFiles) {
@@ -2799,7 +2801,7 @@
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
}
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
- // commit metadata transaction
+ //commit metadata transaction
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
abort(e, e2, mdTxnCtx);
@@ -2852,19 +2854,20 @@
datasetNameFrom, datasetNameTo, mdTxnCtx);
String pregelixHomeKey = "PREGELIX_HOME";
- // Finds PREGELIX_HOME in system environment variables.
+ //Finds PREGELIX_HOME in system environment variables.
String pregelixHome = System.getenv(pregelixHomeKey);
- // Finds PREGELIX_HOME in Java properties.
+ //Finds PREGELIX_HOME in Java properties.
if (pregelixHome == null) {
pregelixHome = System.getProperty(pregelixHomeKey);
}
- // Finds PREGELIX_HOME in AsterixDB configuration.
+ //Finds PREGELIX_HOME in AsterixDB configuration.
if (pregelixHome == null) {
- // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties, pregelixHome can never be null.
+ //Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
+ //pregelixHome can never be null.
pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
}
- // Constructs the pregelix command line.
+ //Constructs the pregelix command line.
List<String> cmd = constructPregelixCommand(pregelixStmt, dataverseNameFrom, datasetNameFrom,
dataverseNameTo, datasetNameTo);
ProcessBuilder pb = new ProcessBuilder(cmd);
@@ -2873,9 +2876,9 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- // Executes the Pregelix command.
+ //Executes the Pregelix command.
int resultState = executeExternalShellProgram(pb);
- // Checks the return state of the external Pregelix command.
+ //Checks the return state of the external Pregelix command.
if (resultState != 0) {
throw new AlgebricksException(
"Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restarted. "
@@ -2893,12 +2896,12 @@
}
}
- // Prepares to run a program on external runtime.
+ //Prepares to run a program on external runtime.
private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
String datasetNameTo, MetadataTransactionContext mdTxnCtx)
throws AlgebricksException, AsterixException, Exception {
- // Validates the source/sink dataverses and datasets.
+ //Validates the source/sink dataverses and datasets.
Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
if (fromDataset == null) {
throw new AsterixException("The source dataset " + datasetNameFrom + " in dataverse " + dataverseNameFrom
@@ -2911,7 +2914,7 @@
}
try {
- // Find the primary index of the sink dataset.
+ //Find the primary index of the sink dataset.
Index toIndex = null;
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
pregelixStmt.getDatasetNameTo().getValue());
@@ -2924,7 +2927,7 @@
if (toIndex == null) {
throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
}
- // Cleans up the sink dataset -- Drop and then Create.
+ //Cleans up the sink dataset -- Drop and then Create.
DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
true);
this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
@@ -2941,12 +2944,12 @@
throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
}
- // Flushes source dataset.
+ //Flushes source dataset.
FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom,
datasetNameFrom);
}
- // Executes external shell commands.
+ //Executes external shell commands.
private int executeExternalShellProgram(ProcessBuilder pb)
throws IOException, AlgebricksException, InterruptedException {
Process process = pb.start();
@@ -2972,15 +2975,15 @@
}
process.waitFor();
}
- // Gets the exit value of the program.
+ //Gets the exit value of the program.
int resultState = process.exitValue();
return resultState;
}
- // Constructs a Pregelix command line.
+ //Constructs a Pregelix command line.
private List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
String fromDatasetName, String toDataverseName, String toDatasetName) {
- // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
+ //Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
@@ -2995,7 +2998,7 @@
asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" + toDatasetName + ",");
asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,");
- // construct command
+ //construct command
List<String> cmds = new ArrayList<String>();
cmds.add("bin/pregelix");
cmds.add(pregelixStmt.getParameters().get(0)); // jar
@@ -3008,7 +3011,7 @@
String outputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,";
boolean custPropAdded = false;
boolean meetCustProp = false;
- // User parameters.
+ //User parameters.
for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
if (meetCustProp) {
if (!s.contains(inputConverterClassKey)) {
@@ -3030,10 +3033,10 @@
if (!custPropAdded) {
cmds.add(customizedPregelixProperty);
- // Appends default converter classes to asterixdbParameterBuilder.
+ //Appends default converter classes to asterixdbParameterBuilder.
asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
- // Remove the last comma.
+ //Remove the last comma.
asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
asterixdbParameterBuilder.length());
cmds.add(asterixdbParameterBuilder.toString());
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 976ca70..8d020e7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.test.aql.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
new file mode 100644
index 0000000..70322cb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as closed {
+ tweetid: int64,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+ ("sockets"="127.0.0.1:10001"),
+ ("address-type"="IP"),
+ ("type-name"="TweetMessageType"),
+ ("format"="adm")
+);
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
new file mode 100644
index 0000000..4d2f9c4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
new file mode 100644
index 0000000..e70df33
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
new file mode 100644
index 0000000..34d6285
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
new file mode 100644
index 0000000..5684b1c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql
new file mode 100644
index 0000000..547085f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as closed {
+ tweetid: string,
+ tweetid-copy:string,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ send-time-copy:datetime,
+ referred-topics: {{ string }},
+ message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+ ("sockets"="127.0.0.1:10001"),
+ ("address-type"="IP"),
+ ("type-name"="TweetMessageType"),
+ ("format"="adm")
+);
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql
new file mode 100644
index 0000000..3d7fdbf
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
new file mode 100644
index 0000000..eb18795
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql
new file mode 100644
index 0000000..578d458
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+start client 10001 file-client localhost data/twitter/tw_messages.adm 500 50 1000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql
new file mode 100644
index 0000000..18bbbbc
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+10000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql
new file mode 100644
index 0000000..0862bae
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql
new file mode 100644
index 0000000..fd8926b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+
+for $x in dataset Tweets
+order by $x.tweetid
+return $x;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql
new file mode 100644
index 0000000..6753868
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+stop 10001
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql
new file mode 100644
index 0000000..1295b97
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
new file mode 100644
index 0000000..7047dbc
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
@@ -0,0 +1,10 @@
+{ "tweetid": "1", "tweetid-copy": "1", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("42.13,80.43"), "send-time": datetime("2005-12-05T21:06:41.000Z"), "send-time-copy": datetime("2005-12-05T21:06:41.000Z"), "referred-topics": {{ "samsung", "plan" }}, "message-text": " love samsung the plan is amazing" }
+{ "tweetid": "10", "tweetid-copy": "10", "user": { "screen-name": "Rolldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstful", "followers_count": 3311368i32 }, "sender-location": point("46.94,93.98"), "send-time": datetime("2011-04-07T14:08:46.000Z"), "send-time-copy": datetime("2011-04-07T14:08:46.000Z"), "referred-topics": {{ "t-mobile", "signal" }}, "message-text": " like t-mobile the signal is good" }
+{ "tweetid": "2", "tweetid-copy": "2", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "David Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("28.86,70.44"), "send-time": datetime("2007-08-15T06:44:17.000Z"), "send-time-copy": datetime("2007-08-15T06:44:17.000Z"), "referred-topics": {{ "sprint", "voice-clarity" }}, "message-text": " like sprint its voice-clarity is mind-blowing" }
+{ "tweetid": "3", "tweetid-copy": "3", "user": { "screen-name": "RollandEckhard#500", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Hetfield", "followers_count": 3311368i32 }, "sender-location": point("39.84,86.48"), "send-time": datetime("2008-12-24T00:07:04.000Z"), "send-time-copy": datetime("2008-12-24T00:07:04.000Z"), "referred-topics": {{ "verizon", "voice-command" }}, "message-text": " can't stand verizon its voice-command is terrible:(" }
+{ "tweetid": "4", "tweetid-copy": "4", "user": { "screen-name": "RollandEckhardstein#221", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstinz", "followers_count": 3311368i32 }, "sender-location": point("27.67,87.32"), "send-time": datetime("2007-02-05T16:39:13.000Z"), "send-time-copy": datetime("2007-02-05T16:39:13.000Z"), "referred-topics": {{ "t-mobile", "customer-service" }}, "message-text": " love t-mobile its customer-service is mind-blowing" }
+{ "tweetid": "5", "tweetid-copy": "5", "user": { "screen-name": "RollandEcstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardst", "followers_count": 3311368i32 }, "sender-location": point("27.3,92.77"), "send-time": datetime("2010-09-12T06:15:28.000Z"), "send-time-copy": datetime("2010-09-12T06:15:28.000Z"), "referred-topics": {{ "t-mobile", "customization" }}, "message-text": " like t-mobile the customization is amazing:)" }
+{ "tweetid": "6", "tweetid-copy": "6", "user": { "screen-name": "Rollkhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Kirk Hammette ", "followers_count": 3311368i32 }, "sender-location": point("45.62,84.78"), "send-time": datetime("2012-01-23T06:23:13.000Z"), "send-time-copy": datetime("2012-01-23T06:23:13.000Z"), "referred-topics": {{ "iphone", "network" }}, "message-text": " like iphone its network is awesome:)" }
+{ "tweetid": "7", "tweetid-copy": "7", "user": { "screen-name": "andEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland khardstein", "followers_count": 3311368i32 }, "sender-location": point("44.12,81.46"), "send-time": datetime("2012-02-17T17:30:26.000Z"), "send-time-copy": datetime("2012-02-17T17:30:26.000Z"), "referred-topics": {{ "t-mobile", "network" }}, "message-text": " hate t-mobile the network is bad" }
+{ "tweetid": "8", "tweetid-copy": "8", "user": { "screen-name": "Rolltein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Ron Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("36.86,90.71"), "send-time": datetime("2009-03-12T13:18:04.000Z"), "send-time-copy": datetime("2009-03-12T13:18:04.000Z"), "referred-topics": {{ "at&t", "touch-screen" }}, "message-text": " dislike at&t its touch-screen is OMG" }
+{ "tweetid": "9", "tweetid-copy": "9", "user": { "screen-name": "Roldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckdstein", "followers_count": 3311368i32 }, "sender-location": point("29.07,97.05"), "send-time": datetime("2012-08-15T20:19:46.000Z"), "send-time-copy": datetime("2012-08-15T20:19:46.000Z"), "referred-topics": {{ "verizon", "speed" }}, "message-text": " hate verizon its speed is bad" }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index a3a1fba..96a2c37 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -36,6 +36,16 @@
</compilation-unit>
</test-case> -->
<test-case FilePath="feeds">
+ <compilation-unit name="feed-push-socket">
+ <output-dir compare="Text">feed-push-socket</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="drop-dataverse-with-disconnected-feed">
+ <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="feed-with-external-parser">
<output-dir compare="Text">feed-with-external-parser</output-dir>
</compilation-unit>
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
new file mode 100644
index 0000000..765dc71
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class FileFeedSocketAdapterClient implements ITestClient {
+ private final int port;
+ private final int wait;
+ private final String url;
+ private Socket socket;
+ private String path;
+ private int batchSize;
+ private int maxCount;
+ private OutputStream out = null;
+
+ // expected args: url, source-file-path, max-count, batch-size, wait
+ public FileFeedSocketAdapterClient(int port, String[] args) throws Exception {
+ this.port = port;
+ if (args.length != 5) {
+ throw new Exception(
+ "Invalid arguments for FileFeedSocketAdapterClient. Expected arguments <url> <source-file-path> <max-count> <batch-size> <wait>");
+ }
+ this.url = args[0];
+ this.path = args[1];
+ this.maxCount = Integer.parseInt(args[2]);
+ this.batchSize = Integer.parseInt(args[3]);
+ this.wait = Integer.parseInt(args[4]);
+ }
+
+ @Override
+ public void start() {
+ try {
+ socket = new Socket(url, port);
+ } catch (IOException e) {
+ System.err.println("Problem in creating socket against host " + url + " on the port " + port);
+ e.printStackTrace();
+ }
+
+ int recordCount = 0;
+ BufferedReader br = null;
+ try {
+ out = socket.getOutputStream();
+ br = new BufferedReader(new FileReader(path));
+ String nextRecord;
+ while ((nextRecord = br.readLine()) != null) {
+ ByteBuffer b = StandardCharsets.UTF_8.encode(nextRecord);
+ if (wait >= 1 && recordCount % batchSize == 0) {
+ Thread.sleep(wait);
+ }
+ out.write(b.array(), 0, b.limit());
+ recordCount++;
+ if (recordCount == maxCount) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException e) {
+ System.err.println("Problem in closing socket against host " + url + " on the port " + port);
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
new file mode 100644
index 0000000..56d626d
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+public interface ITestClient {
+
+ public void start() throws Exception;
+
+ public void stop() throws Exception;
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
new file mode 100644
index 0000000..d26351b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.util.Arrays;
+
+public class TestClientProvider {
+
+ public static ITestClient createTestClient(String[] args, int port) throws Exception {
+ if (args.length < 1) {
+ throw new Exception("Unspecified test client");
+ }
+ String clientName = args[0];
+ String[] clientArgs = Arrays.copyOfRange(args, 1, args.length);
+ switch (clientName) {
+ case "file-client":
+ return new FileFeedSocketAdapterClient(port, clientArgs);
+ default:
+ throw new Exception("Unknown test client: " + clientName);
+ }
+ }
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
index f40cce4..ba32af2 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
@@ -26,10 +26,10 @@
import java.net.Socket;
public class FileTestServer implements ITestServer {
- private String[] paths;
- private final int port;
- private ServerSocket serverSocket;
- private Thread listenerThread;
+ protected String[] paths;
+ protected final int port;
+ protected ServerSocket serverSocket;
+ protected Thread listenerThread;
public FileTestServer(int port) {
this.port = port;
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
index 18a4969..b3b1183 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
@@ -20,7 +20,7 @@
public interface ITestServer {
- public void configure(String[] args);
+ public void configure(String[] args) throws Exception;
public void start() throws Exception;
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
new file mode 100644
index 0000000..1c2cef6
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class OpenSocketFileTestServer extends FileTestServer {
+
+ private boolean closed;
+
+ public OpenSocketFileTestServer(int port) {
+ super(port);
+ }
+
+ @Override
+ public void start() throws IOException {
+ serverSocket = new ServerSocket(port);
+ listenerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!serverSocket.isClosed()) {
+ try {
+ Socket socket = serverSocket.accept();
+ new Thread(new SocketThread(socket)).start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ // Do nothing. This means the socket was closed for some reason.
+ // There is nothing to do here except try to close the socket and see if the
+ // server is still listening!
+ // This also could be due to the close() call
+ }
+ }
+ }
+ });
+ listenerThread.start();
+ }
+
+ private class SocketThread implements Runnable {
+ private Socket socket;
+ private OutputStream os;
+
+ public SocketThread(Socket socket) {
+ this.socket = socket;
+ }
+
+ @Override
+ public void run() {
+ try {
+ os = socket.getOutputStream();
+ byte[] chunk = new byte[1024];
+ for (String path : paths) {
+ try (FileInputStream fin = new FileInputStream(new File(path))) {
+ int read = fin.read(chunk);
+ while (read > 0) {
+ os.write(chunk, 0, read);
+ read = fin.read(chunk);
+ }
+ }
+ }
+ } catch (Throwable th) {
+ th.printStackTrace();
+ // There are two possibilities here:
+ // 1. The socket was closed from the other end.
+ // 2. Server.close() was called.
+ } finally {
+ synchronized (serverSocket) {
+ if (!closed) {
+ try {
+ serverSocket.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ try {
+ os.close();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ }
+ try {
+ socket.close();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws IOException, InterruptedException {
+ synchronized (serverSocket) {
+ closed = true;
+ try {
+ serverSocket.close();
+ if (listenerThread.isAlive()) {
+ listenerThread.join();
+ }
+ } finally {
+ serverSocket.notifyAll();
+ }
+ }
+ }
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
new file mode 100644
index 0000000..3312d1b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import org.apache.asterix.test.client.ITestClient;
+import org.apache.asterix.test.client.TestClientProvider;
+
+public class TestClientServer implements ITestServer {
+
+ // port of the server to connect to
+ private final int port;
+ private ITestClient client;
+
+ public TestClientServer(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public void configure(String[] args) throws Exception {
+ client = TestClientProvider.createTestClient(args, port);
+ }
+
+ @Override
+ public void start() throws Exception {
+ client.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ client.stop();
+ }
+
+}
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
index 60c1c11..0bdb74e 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
@@ -26,6 +26,10 @@
return new FileTestServer(port);
case "rss":
return new RSSTestServer(port);
+ case "open-socket-file":
+ return new OpenSocketFileTestServer(port);
+ case "client":
+ return new TestClientServer(port);
default:
throw new Exception("Unknown test server");
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d5b1c6eff..851acd42 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -38,7 +38,7 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -66,7 +66,7 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
return dataSourceFactory.getPartitionConstraint();
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 17916e5..3965e5e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -22,7 +22,7 @@
import java.util.Map;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
/**
@@ -50,7 +50,7 @@
* In the former case, the IP address is translated to a node controller id
* running on the node with the given IP address.
*/
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
/**
* Creates an instance of IDatasourceAdapter.
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 370ea93..1487cf1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -19,9 +19,12 @@
package org.apache.asterix.external.api;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Map;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
public interface IExternalDataSourceFactory extends Serializable {
@@ -45,7 +48,7 @@
* @return
* @throws Exception
*/
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
/**
* Configure the data parser factory. The passed map contains key value pairs from the
@@ -63,4 +66,32 @@
return false;
}
+ public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
+ AlgebricksAbsolutePartitionConstraint constraints, int count) {
+ if (constraints == null) {
+ ArrayList<String> locs = new ArrayList<String>();
+ Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+ int i = 0;
+ while (i < count) {
+ for (String node : stores.keySet()) {
+ int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+ for (int k = 0; k < numIODevices; k++) {
+ locs.add(node);
+ i++;
+ if (i == count) {
+ break;
+ }
+ }
+ if (i == count) {
+ break;
+ }
+ }
+ }
+ String[] cluster = new String[locs.size()];
+ cluster = locs.toArray(cluster);
+ constraints = new AlgebricksAbsolutePartitionConstraint(cluster);
+ }
+ return constraints;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index adb2602..fdc54d6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -25,5 +25,4 @@
public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception;
public Class<? extends T> getRecordClass();
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 5b3828d..6e3ead2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -40,7 +40,6 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.hdfs.dataflow.ConfFactory;
@@ -51,7 +50,7 @@
implements IInputStreamProviderFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
protected static final long serialVersionUID = 1L;
- protected transient AlgebricksPartitionConstraint clusterLocations;
+ protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
protected String[] readSchedule;
protected boolean read[];
protected InputSplitsFactory inputSplitsFactory;
@@ -76,7 +75,7 @@
JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
confFactory = new ConfFactory(conf);
clusterLocations = getPartitionConstraint();
- int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+ int numPartitions = clusterLocations.getLocations().length;
// if files list was set, we restrict the splits to the list
InputSplit[] inputSplits;
if (files == null) {
@@ -99,7 +98,8 @@
}
}
- // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side
+ // Used to tell the factory to restrict the splits to the intersection between this list and the
+ // actual files on hdfs side
@Override
public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
this.files = files;
@@ -108,7 +108,8 @@
/*
* The method below was modified to take care of the following
- * 1. when target files are not null, it generates a file aware input stream that validate against the files
+ * 1. when target files are not null, it generates a file aware input stream that validate
+ * against the files
* 2. if the data is binary, it returns a generic reader
*/
@Override
@@ -135,7 +136,7 @@
* @return
*/
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
return clusterLocations;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
index b9b6f65..b715a26 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -29,7 +29,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import com.couchbase.client.core.CouchbaseCore;
@@ -71,7 +71,7 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
return AsterixClusterProperties.INSTANCE.getClusterLocations();
}
@@ -100,7 +100,8 @@
}
/*
- * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion.
+ * We distribute the work of streaming vbuckets between all the partitions in a round robin
+ * fashion.
*/
private void schedule() {
schedule = new int[numOfVBuckets];
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index c302b9b..22488f7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -28,14 +28,14 @@
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.hdfs.dataflow.ConfFactory;
public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
protected static final long serialVersionUID = 1L;
- protected transient AlgebricksPartitionConstraint clusterLocations;
+ protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
protected ConfFactory confFactory;
protected Map<String, String> configuration;
@@ -48,7 +48,7 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
return clusterLocations;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index bbe485c..beceea8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -22,11 +22,11 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import com.sun.syndication.feed.synd.SyndEntryImpl;
@@ -36,6 +36,7 @@
private static final long serialVersionUID = 1L;
private Map<String, String> configuration;
private List<String> urls = new ArrayList<String>();
+ private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
@Override
public DataSourceType getDataSourceType() {
@@ -43,8 +44,10 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(urls.size());
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+ int count = urls.size();
+ clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, count);
+ return clusterLocations;
}
@Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
index f02bd93..d02de03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -29,7 +29,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
public abstract class AbstractStreamRecordReaderFactory<T>
@@ -51,7 +51,7 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
return inputStreamFactory.getPartitionConstraint();
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 9b2d095..f41486e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -77,7 +77,7 @@
inString = false;
depth = 0;
do {
- int startPosn = bufferPosn; //starting from where we left off the last time
+ int startPosn = bufferPosn; // starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = reader.read(inputBuffer);
@@ -87,7 +87,7 @@
}
}
if (!hasStarted) {
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+ for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
if (inputBuffer[bufferPosn] == recordStart) {
startPosn = bufferPosn;
hasStarted = true;
@@ -108,7 +108,7 @@
}
}
if (hasStarted) {
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+ for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
if (inString) {
// we are in a string, we only care about the string end
if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index f38c2cb..a2a4742 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -23,6 +23,7 @@
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -30,8 +31,7 @@
import org.apache.asterix.external.util.TwitterUtil;
import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import twitter4j.Status;
@@ -46,6 +46,7 @@
private Map<String, String> configuration;
private boolean pull;
+ private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
@Override
public DataSourceType getDataSourceType() {
@@ -53,8 +54,9 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+ clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
+ return clusterLocations;
}
@Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index e780c95..89008aa 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -20,16 +20,28 @@
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
public class AInputStreamReader extends InputStreamReader {
private AInputStream in;
+ private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+ private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ private CharsetDecoder decoder;
+ private boolean done = false;
public AInputStreamReader(AInputStream in) {
super(in);
this.in = in;
+ this.decoder = StandardCharsets.UTF_8.newDecoder();
+ this.byteBuffer.flip();
}
public boolean skipError() throws Exception {
@@ -51,4 +63,33 @@
public void setFeedLogManager(FeedLogManager feedLogManager) {
in.setFeedLogManager(feedLogManager);
}
+
+ @Override
+ public int read(char cbuf[]) throws IOException {
+ return read(cbuf, 0, cbuf.length);
+ }
+
+ @Override
+ public int read(char cbuf[], int offset, int length) throws IOException {
+ if (done) {
+ return -1;
+ }
+ charBuffer.clear();
+ if (byteBuffer.hasRemaining()) {
+ decoder.decode(byteBuffer, charBuffer, false);
+ System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+ return charBuffer.position();
+ }
+ int len = in.read(bytes, 0, bytes.length);
+ if (len == -1) {
+ done = true;
+ return len;
+ }
+ byteBuffer.clear();
+ byteBuffer.position(len);
+ byteBuffer.flip();
+ decoder.decode(byteBuffer, charBuffer, false);
+ System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+ return charBuffer.position();
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
index 1e86f39..cf8d339 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -25,7 +25,9 @@
import java.util.Map;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SocketInputStream extends AInputStream {
private ServerSocket server;
@@ -34,8 +36,13 @@
public SocketInputStream(ServerSocket server) throws IOException {
this.server = server;
- socket = server.accept();
- connectionStream = socket.getInputStream();
+ socket = new Socket();
+ connectionStream = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
}
@Override
@@ -56,20 +63,31 @@
@Override
public int read(byte b[]) throws IOException {
- int read = connectionStream.read(b, 0, b.length);
- while (read < 0) {
- accept();
- read = connectionStream.read(b, 0, b.length);
- }
- return read;
+ return read(b, 0, b.length);
}
@Override
public int read(byte b[], int off, int len) throws IOException {
- int read = connectionStream.read(b, off, len);
- while (read < 0) {
- accept();
+ if (server == null) {
+ return -1;
+ }
+ int read = -1;
+ try {
read = connectionStream.read(b, off, len);
+ } catch (IOException e) {
+ e.printStackTrace();
+ read = -1;
+ }
+ while (read < 0) {
+ if (!accept()) {
+ return -1;
+ }
+ try {
+ read = connectionStream.read(b, off, len);
+ } catch (IOException e) {
+ e.printStackTrace();
+ read = -1;
+ }
}
return read;
}
@@ -85,22 +103,57 @@
}
@Override
- public void close() throws IOException {
- connectionStream.close();
- socket.close();
- server.close();
+ public synchronized void close() throws IOException {
+ HyracksDataException hde = null;
+ try {
+ if (connectionStream != null) {
+ connectionStream.close();
+ }
+ connectionStream = null;
+ } catch (IOException e) {
+ hde = new HyracksDataException(e);
+ }
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ socket = null;
+ } catch (IOException e) {
+ hde = ExternalDataExceptionUtils.suppress(hde, e);
+ }
+ try {
+ if (server != null) {
+ server.close();
+ }
+ } catch (IOException e) {
+ hde = ExternalDataExceptionUtils.suppress(hde, e);
+ } finally {
+ server = null;
+ }
+ if (hde != null) {
+ throw hde;
+ }
}
- private void accept() throws IOException {
- connectionStream.close();
- socket.close();
- socket = server.accept();
- connectionStream = socket.getInputStream();
+ private boolean accept() throws IOException {
+ try {
+ connectionStream.close();
+ connectionStream = null;
+ socket.close();
+ socket = null;
+ socket = server.accept();
+ connectionStream = socket.getInputStream();
+ return true;
+ } catch (Exception e) {
+ close();
+ return false;
+ }
}
@Override
public boolean stop() throws Exception {
- return false;
+ close();
+ return true;
}
@Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 3f70ce1..5c1583e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -34,7 +34,6 @@
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.NodeResolverFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -48,7 +47,8 @@
protected static INodeResolver nodeResolver;
protected Map<String, String> configuration;
protected FileSplit[] inputFileSplits;
- protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
+ protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log
+ // storage
protected boolean isFeed;
protected String expression;
// transient fields (They don't need to be serialized and transferred)
@@ -84,7 +84,7 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
return constraints;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
index ea60f43..6fdc42d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -35,7 +35,6 @@
import org.apache.asterix.om.util.AsterixRuntimeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -106,7 +105,7 @@
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
List<String> locations = new ArrayList<String>();
for (Pair<String, Integer> socket : sockets) {
locations.add(socket.first);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
index 484626a..95378cb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
@@ -27,7 +27,6 @@
import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStreamProvider;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
/**
@@ -54,7 +53,7 @@
private Map<String, String> configuration;
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
String[] locations = null;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index e39b507..cd4a3c1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -58,7 +58,6 @@
@Override
public AInputStream getInputStream() throws Exception {
- twitterServer.start();
return twitterServer;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 7e28c35..d0348c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -149,7 +149,7 @@
private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
int waitCycleCount = 0;
ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
- while (ingestionRuntime == null && waitCycleCount < 10) {
+ while (ingestionRuntime == null && waitCycleCount < 1000) {
try {
Thread.sleep(3000);
waitCycleCount++;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 3cb5d64..36c11e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -241,7 +241,8 @@
FeedRuntimeId runtimeId = null;
FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
if (endFeedMessage.isCompleteDisconnection()) {
- // subscribableRuntimeType represents the location at which the feed connection receives data
+ // subscribableRuntimeType represents the location at which the feed connection receives
+ // data
FeedRuntimeType runtimeType = null;
switch (subscribableRuntimeType) {
case INTAKE:
@@ -257,15 +258,19 @@
runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
.getFeedRuntime(connectionId, runtimeId);
- feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+ if (feedRuntime != null) {
+ feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+ }
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
}
} else {
- // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+ // subscribaleRuntimeType represents the location for data hand-off in presence of
+ // subscribers
switch (subscribableRuntimeType) {
case INTAKE:
- // illegal state as data hand-off from one feed to another does not happen at intake
+ // illegal state as data hand-off from one feed to another does not happen at
+ // intake
throw new IllegalStateException("Illegal State, invalid runtime type " + subscribableRuntimeType);
case COMPUTE:
// feed could be primary or secondary, doesn't matter
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index c128545..50d8ac0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -58,13 +58,12 @@
public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
AlgebricksPartitionConstraint partitionConstraints) throws Exception {
- File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
- String[] locations = null;
if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints");
- } else {
- locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
}
+ File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+ String[] locations = null;
+ locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
List<FileSplit> splits = new ArrayList<FileSplit>();
String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
int i = 0;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 7ac0428..9a72135 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -42,7 +42,6 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
@@ -199,8 +198,8 @@
return conf;
}
- public static AlgebricksPartitionConstraint getPartitionConstraints(
- AlgebricksPartitionConstraint clusterLocations) {
+ public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
+ AlgebricksAbsolutePartitionConstraint clusterLocations) {
if (clusterLocations == null) {
ArrayList<String> locs = new ArrayList<String>();
Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index e34a09b..6b11d21 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -23,14 +23,14 @@
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,14 +48,17 @@
private Map<String, String> configuration;
+ private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+
@Override
public String getAlias() {
return "test_typed";
}
@Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(1);
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+ clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1);
+ return clusterLocations;
}
@Override