blob: 8efd61fa2bdbf4ad062ffcfe172003b998811627 [file] [log] [blame]
kisskysc98f0fc2013-01-26 09:14:15 +00001Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
2===================================================================
3--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061)
4+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy)
5@@ -25,8 +25,11 @@
6 import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
7 import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
8 import edu.uci.ics.asterix.metadata.entities.Dataverse;
9+import edu.uci.ics.asterix.om.base.AInt32;
10+import edu.uci.ics.asterix.om.base.AMutableInt32;
11 import edu.uci.ics.asterix.om.base.ARecord;
12 import edu.uci.ics.asterix.om.base.AString;
13+import edu.uci.ics.asterix.om.types.BuiltinType;
14 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
15 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
16
17@@ -40,12 +43,18 @@
18 // Payload field containing serialized Dataverse.
19 public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
20
21+ private AMutableInt32 aInt32;
22+ protected ISerializerDeserializer<AInt32> aInt32Serde;
23+
24 @SuppressWarnings("unchecked")
25 private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
26 .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
27
28+ @SuppressWarnings("unchecked")
29 public DataverseTupleTranslator(boolean getTuple) {
30 super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
31+ aInt32 = new AMutableInt32(-1);
32+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
33 }
34
35 @Override
36@@ -57,7 +66,8 @@
37 DataInput in = new DataInputStream(stream);
38 ARecord dataverseRecord = recordSerDes.deserialize(in);
39 return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
40- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
41+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
42+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
43 }
44
45 @Override
46@@ -88,6 +98,12 @@
47 stringSerde.serialize(aString, fieldValue.getDataOutput());
48 recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
49
50+ // write field 3
51+ fieldValue.reset();
52+ aInt32.setValue(instance.getPendingOp());
53+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
54+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
55+
56 recordBuilder.write(tupleBuilder.getDataOutput(), true);
57 tupleBuilder.addFieldEndOffset();
58
59Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
60===================================================================
61--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061)
62+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy)
63@@ -77,9 +77,9 @@
64 protected ISerializerDeserializer<AInt32> aInt32Serde;
65
66 @SuppressWarnings("unchecked")
67- public DatasetTupleTranslator(boolean getTuple) {
68+ public DatasetTupleTranslator(boolean getTuple) {
69 super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
70- aInt32 = new AMutableInt32(-1);
71+ aInt32 = new AMutableInt32(-1);
72 aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
73 }
74
75@@ -104,8 +104,10 @@
76 .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
77 DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
78 IDatasetDetails datasetDetails = null;
79- int datasetId = ((AInt32) datasetRecord
80+ int datasetId = ((AInt32) datasetRecord
81 .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
82+ int pendingOp = ((AInt32) datasetRecord
83+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
84 switch (datasetType) {
85 case FEED:
86 case INTERNAL: {
87@@ -197,7 +199,7 @@
88 }
89 datasetDetails = new ExternalDatasetDetails(adapter, properties);
90 }
91- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
92+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
93 }
94
95 @Override
96@@ -248,13 +250,19 @@
97 aString.setValue(Calendar.getInstance().getTime().toString());
98 stringSerde.serialize(aString, fieldValue.getDataOutput());
99 recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
100-
101+
102 // write field 8
103 fieldValue.reset();
104 aInt32.setValue(dataset.getDatasetId());
105 aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
106 recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
107-
108+
109+ // write field 9
110+ fieldValue.reset();
111+ aInt32.setValue(dataset.getPendingOp());
112+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
113+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
114+
115 // write record
116 recordBuilder.write(tupleBuilder.getDataOutput(), true);
117 tupleBuilder.addFieldEndOffset();
118@@ -290,13 +298,15 @@
119 fieldValue.reset();
120 aString.setValue(name);
121 stringSerde.serialize(aString, fieldValue.getDataOutput());
122- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
123+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
124+ fieldValue);
125
126 // write field 1
127 fieldValue.reset();
128 aString.setValue(value);
129 stringSerde.serialize(aString, fieldValue.getDataOutput());
130- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
131+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
132+ fieldValue);
133
134 propertyRecordBuilder.write(out, true);
135 }
136Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
137===================================================================
138--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061)
139+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy)
140@@ -96,13 +96,15 @@
141 }
142 Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
143 .getBoolean();
144+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
145+ .getIntegerValue();
146 // Check if there is a gram length as well.
147 int gramLength = -1;
148 int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
149 if (gramLenPos >= 0) {
150 gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
151 }
152- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
153+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
154 }
155
156 @Override
157@@ -174,7 +176,12 @@
158 stringSerde.serialize(aString, fieldValue.getDataOutput());
159 recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
160
161- // write optional field 7
162+ // write field 7
163+ fieldValue.reset();
164+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
165+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
166+
167+ // write optional field 8
168 if (instance.getGramLength() > 0) {
169 fieldValue.reset();
170 nameValue.reset();
171Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
172===================================================================
173--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061)
174+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy)
175@@ -129,7 +129,7 @@
176
177 public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
178 private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
179- private final MetadataTransactionContext mdTxnCtx;
180+ private MetadataTransactionContext mdTxnCtx;
181 private boolean isWriteTransaction;
182 private Map<String, String[]> stores;
183 private Map<String, String> config;
184@@ -156,8 +156,7 @@
185 return config;
186 }
187
188- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
189- this.mdTxnCtx = mdTxnCtx;
190+ public AqlMetadataProvider(Dataverse defaultDataverse) {
191 this.defaultDataverse = defaultDataverse;
192 this.stores = AsterixProperties.INSTANCE.getStores();
193 }
194@@ -181,6 +180,10 @@
195 public void setWriterFactory(IAWriterFactory writerFactory) {
196 this.writerFactory = writerFactory;
197 }
198+
199+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
200+ this.mdTxnCtx = mdTxnCtx;
201+ }
202
203 public MetadataTransactionContext getMetadataTxnContext() {
204 return mdTxnCtx;
205Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
206===================================================================
207--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061)
208+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy)
209@@ -35,15 +35,18 @@
210 private final DatasetType datasetType;
211 private IDatasetDetails datasetDetails;
212 private final int datasetId;
213+ // Type of pending operations with respect to atomic DDL operation
214+ private final int pendingOp;
215
216 public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
217- DatasetType datasetType, int datasetId) {
218+ DatasetType datasetType, int datasetId, int pendingOp) {
219 this.dataverseName = dataverseName;
220 this.datasetName = datasetName;
221 this.itemTypeName = itemTypeName;
222 this.datasetType = datasetType;
223 this.datasetDetails = datasetDetails;
224 this.datasetId = datasetId;
225+ this.pendingOp = pendingOp;
226 }
227
228 public String getDataverseName() {
229@@ -73,6 +76,10 @@
230 public int getDatasetId() {
231 return datasetId;
232 }
233+
234+ public int getPendingOp() {
235+ return pendingOp;
236+ }
237
238 @Override
239 public Object addToCache(MetadataCache cache) {
240Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
241===================================================================
242--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061)
243+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy)
244@@ -45,9 +45,11 @@
245 private final boolean isPrimaryIndex;
246 // Specific to NGRAM indexes.
247 private final int gramLength;
248+ // Type of pending operations with respect to atomic DDL operation
249+ private final int pendingOp;
250
251 public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
252- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
253+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
254 this.dataverseName = dataverseName;
255 this.datasetName = datasetName;
256 this.indexName = indexName;
257@@ -55,10 +57,11 @@
258 this.keyFieldNames = keyFieldNames;
259 this.gramLength = gramLength;
260 this.isPrimaryIndex = isPrimaryIndex;
261+ this.pendingOp = pendingOp;
262 }
263
264 public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
265- List<String> keyFieldNames, boolean isPrimaryIndex) {
266+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
267 this.dataverseName = dataverseName;
268 this.datasetName = datasetName;
269 this.indexName = indexName;
270@@ -66,6 +69,7 @@
271 this.keyFieldNames = keyFieldNames;
272 this.gramLength = -1;
273 this.isPrimaryIndex = isPrimaryIndex;
274+ this.pendingOp = pendingOp;
275 }
276
277 public String getDataverseName() {
278@@ -95,6 +99,10 @@
279 public boolean isPrimaryIndex() {
280 return isPrimaryIndex;
281 }
282+
283+ public int getPendingOp() {
284+ return pendingOp;
285+ }
286
287 public boolean isSecondaryIndex() {
288 return !isPrimaryIndex();
289Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
290===================================================================
291--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061)
292+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy)
293@@ -27,10 +27,12 @@
294 // Enforced to be unique within an Asterix cluster..
295 private final String dataverseName;
296 private final String dataFormat;
297+ private final int pendingOp;
298
299- public Dataverse(String dataverseName, String format) {
300+ public Dataverse(String dataverseName, String format, int pendingOp) {
301 this.dataverseName = dataverseName;
302 this.dataFormat = format;
303+ this.pendingOp = pendingOp;
304 }
305
306 public String getDataverseName() {
307@@ -40,6 +42,10 @@
308 public String getDataFormat() {
309 return dataFormat;
310 }
311+
312+ public int getPendingOp() {
313+ return pendingOp;
314+ }
315
316 @Override
317 public Object addToCache(MetadataCache cache) {
318Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
319===================================================================
320--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061)
321+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy)
322@@ -25,6 +25,7 @@
323 import edu.uci.ics.asterix.common.exceptions.AsterixException;
324 import edu.uci.ics.asterix.common.functions.FunctionSignature;
325 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
326+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
327 import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
328 import edu.uci.ics.asterix.metadata.api.IMetadataNode;
329 import edu.uci.ics.asterix.metadata.api.IValueExtractor;
330@@ -160,7 +161,7 @@
331 // Add the primary index for the dataset.
332 InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
333 Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
334- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
335+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
336 addIndex(jobId, primaryIndex);
337 ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
338 dataset.getDatasetName());
339@@ -260,7 +261,7 @@
340 IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
341 NoOpOperationCallback.INSTANCE);
342 TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
343- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
344+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
345 // TODO: fix exceptions once new BTree exception model is in hyracks.
346 indexAccessor.insert(tuple);
347 //TODO: extract the key from the tuple and get the PKHashValue from the key.
348@@ -536,7 +537,7 @@
349 // The transaction with txnId will have an S lock on the
350 // resource. Note that lock converters have a higher priority than
351 // regular waiters in the LockManager.
352- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
353+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
354 indexAccessor.delete(tuple);
355 //TODO: extract the key from the tuple and get the PKHashValue from the key.
356 //check how to get the oldValue.
357@@ -803,7 +804,9 @@
358 private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
359 IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
360 TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
361- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
362+ //#. currently lock is not needed to access any metadata
363+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
364+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
365 IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
366 long resourceID = index.getResourceID();
367 IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
368Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
369===================================================================
370--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061)
371+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy)
372@@ -20,6 +20,11 @@
373 import edu.uci.ics.asterix.metadata.MetadataCache;
374
375 public interface IMetadataEntity extends Serializable {
376+
377+ public static final int PENDING_NO_OP = 0;
378+ public static final int PENDING_ADD_OP = 1;
379+ public static final int PENDING_DROP_OP = 2;
380+
381 Object addToCache(MetadataCache cache);
382
383 Object dropFromCache(MetadataCache cache);
384Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
385===================================================================
386--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061)
387+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy)
388@@ -17,6 +17,8 @@
389
390 import java.rmi.RemoteException;
391 import java.util.List;
392+import java.util.concurrent.locks.ReadWriteLock;
393+import java.util.concurrent.locks.ReentrantReadWriteLock;
394
395 import edu.uci.ics.asterix.common.functions.FunctionSignature;
396 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
397@@ -79,11 +81,10 @@
398 public class MetadataManager implements IMetadataManager {
399 // Set in init().
400 public static MetadataManager INSTANCE;
401-
402 private final MetadataCache cache = new MetadataCache();
403 private IAsterixStateProxy proxy;
404 private IMetadataNode metadataNode;
405-
406+
407 public MetadataManager(IAsterixStateProxy proxy) {
408 if (proxy == null) {
409 throw new Error("Null proxy given to MetadataManager.");
410@@ -206,11 +207,14 @@
411
412 @Override
413 public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
414+ // add dataset into metadataNode
415 try {
416 metadataNode.addDataset(ctx.getJobId(), dataset);
417 } catch (RemoteException e) {
418 throw new MetadataException(e);
419 }
420+
421+ // reflect the dataset into the cache
422 ctx.addDataset(dataset);
423 }
424
425@@ -585,4 +589,5 @@
426 }
427 return adapter;
428 }
429+
430 }
431\ No newline at end of file
432Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
433===================================================================
434--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061)
435+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy)
436@@ -19,6 +19,7 @@
437
438 import edu.uci.ics.asterix.common.functions.FunctionSignature;
439 import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
440+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
441 import edu.uci.ics.asterix.metadata.entities.Dataset;
442 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
443 import edu.uci.ics.asterix.metadata.entities.Datatype;
444@@ -104,19 +105,19 @@
445 }
446
447 public void dropDataset(String dataverseName, String datasetName) {
448- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
449+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
450 droppedCache.addDatasetIfNotExists(dataset);
451 logAndApply(new MetadataLogicalOperation(dataset, false));
452 }
453
454 public void dropIndex(String dataverseName, String datasetName, String indexName) {
455- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
456+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
457 droppedCache.addIndexIfNotExists(index);
458 logAndApply(new MetadataLogicalOperation(index, false));
459 }
460
461 public void dropDataverse(String dataverseName) {
462- Dataverse dataverse = new Dataverse(dataverseName, null);
463+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
464 droppedCache.addDataverseIfNotExists(dataverse);
465 logAndApply(new MetadataLogicalOperation(dataverse, false));
466 }
467@@ -162,7 +163,7 @@
468 }
469 return droppedCache.getDataset(dataverseName, datasetName) != null;
470 }
471-
472+
473 public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
474 if (droppedCache.getDataverse(dataverseName) != null) {
475 return true;
476Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
477===================================================================
478--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061)
479+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy)
480@@ -80,10 +80,11 @@
481 public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
482 public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
483 public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
484+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
485
486 private static final ARecordType createDataverseRecordType() {
487- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
488- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
489+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
490+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
491 }
492
493 // Helper constants for accessing fields in an ARecord of anonymous type
494@@ -158,10 +159,11 @@
495 public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
496 public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
497 public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
498+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
499
500 private static final ARecordType createDatasetRecordType() {
501 String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
502- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
503+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
504
505 List<IAType> internalRecordUnionList = new ArrayList<IAType>();
506 internalRecordUnionList.add(BuiltinType.ANULL);
507@@ -179,7 +181,8 @@
508 AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
509
510 IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
511- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
512+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
513+ BuiltinType.AINT32 };
514 return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
515 }
516
517@@ -264,13 +267,14 @@
518 public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
519 public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
520 public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
521+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
522
523 private static final ARecordType createIndexRecordType() {
524 AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
525 String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
526- "IsPrimary", "Timestamp" };
527+ "IsPrimary", "Timestamp", "PendingOp" };
528 IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
529- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
530+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
531 return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
532 };
533
534Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
535===================================================================
536--- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061)
537+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy)
538@@ -31,6 +31,7 @@
539 import edu.uci.ics.asterix.metadata.IDatasetDetails;
540 import edu.uci.ics.asterix.metadata.MetadataManager;
541 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
542+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
543 import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
544 import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
545 import edu.uci.ics.asterix.metadata.entities.Dataset;
546@@ -226,7 +227,7 @@
547 public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
548 String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
549 String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
550- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
551+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
552 }
553
554 public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
555@@ -236,7 +237,7 @@
556 primaryIndexes[i].getNodeGroupName());
557 MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
558 primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
559- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
560+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
561 }
562 }
563
564@@ -267,7 +268,7 @@
565 for (int i = 0; i < secondaryIndexes.length; i++) {
566 MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
567 secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
568- secondaryIndexes[i].getPartitioningExpr(), false));
569+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
570 }
571 }
572
573Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
574===================================================================
575--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061)
576+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
577@@ -95,11 +95,11 @@
578 File testFile = tcCtx.getTestFile(cUnit);
579
580 /*************** to avoid run failure cases ****************
581- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
582+ if (!testFile.getAbsolutePath().contains("index-selection/")) {
583 continue;
584 }
585 ************************************************************/
586-
587+
588 File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
589 File actualFile = new File(PATH_ACTUAL + File.separator
590 + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
591Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
592===================================================================
593--- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061)
594+++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy)
595@@ -90,7 +90,7 @@
596
597 private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
598
599- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
600+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
601 AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
602 ACIDException, AsterixException {
603
604@@ -111,67 +111,10 @@
605 throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
606 }
607 if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
608- return new JobSpecification[0];
609+ return new JobSpecification();
610 }
611-
612- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
613- dataset.getDatasetName());
614- int numSecondaryIndexes = 0;
615- for (Index index : datasetIndexes) {
616- if (index.isSecondaryIndex()) {
617- numSecondaryIndexes++;
618- }
619- }
620- JobSpecification[] specs;
621- if (numSecondaryIndexes > 0) {
622- specs = new JobSpecification[numSecondaryIndexes + 1];
623- int i = 0;
624- // First, drop secondary indexes.
625- for (Index index : datasetIndexes) {
626- if (index.isSecondaryIndex()) {
627- specs[i] = new JobSpecification();
628- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
629- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
630- datasetName, index.getIndexName());
631- IIndexDataflowHelperFactory dfhFactory;
632- switch (index.getIndexType()) {
633- case BTREE:
634- dfhFactory = new LSMBTreeDataflowHelperFactory(
635- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
636- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
637- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
638- break;
639- case RTREE:
640- dfhFactory = new LSMRTreeDataflowHelperFactory(
641- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
642- new IBinaryComparatorFactory[] { null },
643- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
644- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
645- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
646- break;
647- case NGRAM_INVIX:
648- case WORD_INVIX:
649- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
650- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
651- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
652- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
653- break;
654- default:
655- throw new AsterixException("Unknown index type provided.");
656- }
657- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
658- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
659- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
660- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
661- idxSplitsAndConstraint.second);
662- i++;
663- }
664- }
665- } else {
666- specs = new JobSpecification[1];
667- }
668+
669 JobSpecification specPrimary = new JobSpecification();
670- specs[specs.length - 1] = specPrimary;
671
672 Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
673 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
674@@ -187,7 +130,7 @@
675
676 specPrimary.addRoot(primaryBtreeDrop);
677
678- return specs;
679+ return specPrimary;
680 }
681
682 public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
683Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
684===================================================================
685--- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061)
686+++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy)
687@@ -21,6 +21,8 @@
688 import java.util.HashMap;
689 import java.util.List;
690 import java.util.Map;
691+import java.util.concurrent.locks.ReadWriteLock;
692+import java.util.concurrent.locks.ReentrantReadWriteLock;
693
694 import org.json.JSONException;
695
696@@ -68,6 +70,7 @@
697 import edu.uci.ics.asterix.metadata.MetadataException;
698 import edu.uci.ics.asterix.metadata.MetadataManager;
699 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
700+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
701 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
702 import edu.uci.ics.asterix.metadata.entities.Dataset;
703 import edu.uci.ics.asterix.metadata.entities.Datatype;
704@@ -112,6 +115,7 @@
705 private final PrintWriter out;
706 private final SessionConfig sessionConfig;
707 private final DisplayFormat pdf;
708+ private final ReadWriteLock cacheLatch;
709 private Dataverse activeDefaultDataverse;
710 private List<FunctionDecl> declaredFunctions;
711
712@@ -121,6 +125,7 @@
713 this.out = out;
714 this.sessionConfig = pc;
715 this.pdf = pdf;
716+ this.cacheLatch = new ReentrantReadWriteLock(true);
717 declaredFunctions = getDeclaredFunctions(aqlStatements);
718 }
719
720@@ -143,8 +148,7 @@
721
722 for (Statement stmt : aqlStatements) {
723 validateOperation(activeDefaultDataverse, stmt);
724- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
725- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
726+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
727 metadataProvider.setWriterFactory(writerFactory);
728 metadataProvider.setOutputFile(outputFile);
729 metadataProvider.setConfig(config);
730@@ -253,15 +257,9 @@
731 }
732
733 }
734- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
735 } catch (Exception e) {
736- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
737 throw new AlgebricksException(e);
738 }
739- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
740- for (JobSpecification jobspec : jobsToExecute) {
741- runJob(hcc, jobspec);
742- }
743 }
744 return executionResult;
745 }
746@@ -289,398 +287,802 @@
747
748 private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
749 List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
750- DataverseDecl dvd = (DataverseDecl) stmt;
751- String dvName = dvd.getDataverseName().getValue();
752- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
753- if (dv == null) {
754- throw new MetadataException("Unknown dataverse " + dvName);
755+
756+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
757+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
758+ acquireReadLatch();
759+
760+ try {
761+ DataverseDecl dvd = (DataverseDecl) stmt;
762+ String dvName = dvd.getDataverseName().getValue();
763+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
764+ if (dv == null) {
765+ throw new MetadataException("Unknown dataverse " + dvName);
766+ }
767+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
768+ return dv;
769+ } catch (Exception e) {
770+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
771+ throw new MetadataException(e);
772+ } finally {
773+ releaseReadLatch();
774 }
775- return dv;
776 }
777
778 private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
779 List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
780 ACIDException {
781- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
782- String dvName = stmtCreateDataverse.getDataverseName().getValue();
783- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
784- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
785- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
786+
787+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
788+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
789+ acquireWriteLatch();
790+
791+ try {
792+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
793+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
794+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
795+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
796+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
797+ }
798+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
799+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
800+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
801+ } catch (Exception e) {
802+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
803+ throw new AlgebricksException(e);
804+ } finally {
805+ releaseWriteLatch();
806 }
807- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
808- stmtCreateDataverse.getFormat()));
809 }
810
811 private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
812 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
813- DatasetDecl dd = (DatasetDecl) stmt;
814- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
815- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
816- if (dataverseName == null) {
817- throw new AlgebricksException(" dataverse not specified ");
818- }
819- String datasetName = dd.getName().getValue();
820- DatasetType dsType = dd.getDatasetType();
821- String itemTypeName = dd.getItemTypeName().getValue();
822
823- IDatasetDetails datasetDetails = null;
824- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
825- datasetName);
826- if (ds != null) {
827- if (dd.getIfNotExists()) {
828- return;
829- } else {
830- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
831+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
832+ boolean bActiveTxn = true;
833+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
834+ acquireWriteLatch();
835+
836+ try {
837+ DatasetDecl dd = (DatasetDecl) stmt;
838+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
839+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
840+ if (dataverseName == null) {
841+ throw new AlgebricksException(" dataverse not specified ");
842 }
843- }
844- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
845- itemTypeName);
846- if (dt == null) {
847- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
848- }
849- switch (dd.getDatasetType()) {
850- case INTERNAL: {
851- IAType itemType = dt.getDatatype();
852- if (itemType.getTypeTag() != ATypeTag.RECORD) {
853- throw new AlgebricksException("Can only partition ARecord's.");
854+ String datasetName = dd.getName().getValue();
855+ DatasetType dsType = dd.getDatasetType();
856+ String itemTypeName = dd.getItemTypeName().getValue();
857+
858+ IDatasetDetails datasetDetails = null;
859+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
860+ datasetName);
861+ if (ds != null) {
862+ if (dd.getIfNotExists()) {
863+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
864+ return;
865+ } else {
866+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
867 }
868- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
869- .getPartitioningExprs();
870- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
871- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
872- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
873- break;
874 }
875- case EXTERNAL: {
876- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
877- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
878- datasetDetails = new ExternalDatasetDetails(adapter, properties);
879- break;
880+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
881+ itemTypeName);
882+ if (dt == null) {
883+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
884 }
885- case FEED: {
886- IAType itemType = dt.getDatatype();
887- if (itemType.getTypeTag() != ATypeTag.RECORD) {
888- throw new AlgebricksException("Can only partition ARecord's.");
889+ switch (dd.getDatasetType()) {
890+ case INTERNAL: {
891+ IAType itemType = dt.getDatatype();
892+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
893+ throw new AlgebricksException("Can only partition ARecord's.");
894+ }
895+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
896+ .getPartitioningExprs();
897+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
898+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
899+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
900+ ngName);
901+ break;
902 }
903- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
904- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
905- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
906- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
907- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
908- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
909- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
910- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
911- break;
912+ case EXTERNAL: {
913+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
914+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
915+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
916+ break;
917+ }
918+ case FEED: {
919+ IAType itemType = dt.getDatatype();
920+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
921+ throw new AlgebricksException("Can only partition ARecord's.");
922+ }
923+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
924+ .getPartitioningExprs();
925+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
926+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
927+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
928+ .getConfiguration();
929+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
930+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
931+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
932+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
933+ break;
934+ }
935 }
936+
937+ //#. add a new dataset with PendingAddOp
938+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
939+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
940+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
941+
942+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
943+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
944+ dataverseName);
945+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
946+ metadataProvider);
947+
948+ //#. make metadataTxn commit before calling runJob.
949+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
950+ bActiveTxn = false;
951+
952+ //#. runJob
953+ runJob(hcc, jobSpec);
954+
955+ //#. begin new metadataTxn
956+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
957+ bActiveTxn = true;
958+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
959+ }
960+
961+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
962+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
963+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
964+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
965+ IMetadataEntity.PENDING_NO_OP));
966+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
967+ } catch (Exception e) {
968+ if (bActiveTxn) {
969+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
970+ }
971+ throw new AlgebricksException(e);
972+ } finally {
973+ releaseWriteLatch();
974 }
975- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
976- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
977- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
978- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
979- dataverseName);
980- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
981- }
982 }
983
984 private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
985 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
986- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
987- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
988- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
989- if (dataverseName == null) {
990- throw new AlgebricksException(" dataverse not specified ");
991- }
992- String datasetName = stmtCreateIndex.getDatasetName().getValue();
993- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
994- datasetName);
995- if (ds == null) {
996- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
997- + dataverseName);
998- }
999- String indexName = stmtCreateIndex.getIndexName().getValue();
1000- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
1001- datasetName, indexName);
1002- if (idx != null) {
1003- if (!stmtCreateIndex.getIfNotExists()) {
1004- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
1005- } else {
1006- stmtCreateIndex.setNeedToCreate(false);
1007+
1008+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1009+ boolean bActiveTxn = true;
1010+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1011+ acquireWriteLatch();
1012+
1013+ try {
1014+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
1015+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
1016+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
1017+ if (dataverseName == null) {
1018+ throw new AlgebricksException(" dataverse not specified ");
1019 }
1020- } else {
1021+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
1022+
1023+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
1024+ datasetName);
1025+ if (ds == null) {
1026+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
1027+ + dataverseName);
1028+ }
1029+
1030+ String indexName = stmtCreateIndex.getIndexName().getValue();
1031+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
1032+ datasetName, indexName);
1033+
1034+ if (idx != null) {
1035+ if (!stmtCreateIndex.getIfNotExists()) {
1036+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
1037+ } else {
1038+ stmtCreateIndex.setNeedToCreate(false);
1039+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1040+ return;
1041+ }
1042+ }
1043+
1044+ //#. add a new index with PendingAddOp
1045 Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
1046- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
1047+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
1048+ IMetadataEntity.PENDING_ADD_OP);
1049 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
1050- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
1051
1052+ //#. create the index artifact in NC.
1053 CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
1054 index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
1055- JobSpecification loadIndexJobSpec = IndexOperations
1056- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
1057- runJob(hcc, loadIndexJobSpec);
1058+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
1059+ if (spec == null) {
1060+ throw new AsterixException("Failed to create job spec for creating index '"
1061+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
1062+ }
1063+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1064+ bActiveTxn = false;
1065+
1066+ runJob(hcc, spec);
1067+
1068+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1069+ bActiveTxn = true;
1070+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1071+
1072+ //#. load data into the index in NC.
1073+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
1074+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
1075+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
1076+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1077+ bActiveTxn = false;
1078+
1079+ runJob(hcc, spec);
1080+
1081+ //#. begin new metadataTxn
1082+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1083+ bActiveTxn = true;
1084+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1085+
1086+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
1087+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
1088+ indexName);
1089+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
1090+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
1091+ IMetadataEntity.PENDING_NO_OP);
1092+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
1093+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1094+
1095+ } catch (Exception e) {
1096+ if (bActiveTxn) {
1097+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1098+ }
1099+ throw new AlgebricksException(e);
1100+ } finally {
1101+ releaseWriteLatch();
1102 }
1103 }
1104
1105 private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1106 List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
1107 MetadataException {
1108- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1109- TypeDecl stmtCreateType = (TypeDecl) stmt;
1110- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
1111- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
1112- if (dataverseName == null) {
1113- throw new AlgebricksException(" dataverse not specified ");
1114- }
1115- String typeName = stmtCreateType.getIdent().getValue();
1116- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
1117- if (dv == null) {
1118- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
1119- }
1120- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
1121- if (dt != null) {
1122- if (!stmtCreateType.getIfNotExists())
1123- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
1124- } else {
1125- if (builtinTypeMap.get(typeName) != null) {
1126- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
1127+
1128+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1129+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1130+ acquireWriteLatch();
1131+
1132+ try {
1133+ TypeDecl stmtCreateType = (TypeDecl) stmt;
1134+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
1135+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
1136+ if (dataverseName == null) {
1137+ throw new AlgebricksException(" dataverse not specified ");
1138+ }
1139+ String typeName = stmtCreateType.getIdent().getValue();
1140+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
1141+ if (dv == null) {
1142+ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
1143+ }
1144+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
1145+ if (dt != null) {
1146+ if (!stmtCreateType.getIfNotExists()) {
1147+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
1148+ }
1149 } else {
1150- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
1151- dataverseName);
1152- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
1153- IAType type = typeMap.get(typeSignature);
1154- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
1155+ if (builtinTypeMap.get(typeName) != null) {
1156+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
1157+ } else {
1158+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
1159+ dataverseName);
1160+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
1161+ IAType type = typeMap.get(typeSignature);
1162+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
1163+ }
1164 }
1165+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1166+ } catch (Exception e) {
1167+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1168+ throw new AlgebricksException(e);
1169+ } finally {
1170+ releaseWriteLatch();
1171 }
1172 }
1173
1174 private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1175 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1176- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1177- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
1178- String dvName = stmtDelete.getDataverseName().getValue();
1179
1180- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
1181- if (dv == null) {
1182- if (!stmtDelete.getIfExists()) {
1183- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
1184+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1185+ boolean bActiveTxn = true;
1186+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1187+ acquireWriteLatch();
1188+
1189+ try {
1190+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
1191+ String dvName = stmtDelete.getDataverseName().getValue();
1192+
1193+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
1194+ if (dv == null) {
1195+ if (!stmtDelete.getIfExists()) {
1196+ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
1197+ }
1198+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1199+ return;
1200 }
1201- } else {
1202+
1203+ //#. prepare jobs which will drop corresponding datasets with indexes.
1204 List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
1205 for (int j = 0; j < datasets.size(); j++) {
1206 String datasetName = datasets.get(j).getDatasetName();
1207 DatasetType dsType = datasets.get(j).getDatasetType();
1208 if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
1209+
1210 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
1211 for (int k = 0; k < indexes.size(); k++) {
1212 if (indexes.get(k).isSecondaryIndex()) {
1213- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
1214- metadataProvider);
1215+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
1216+ indexes.get(k).getIndexName());
1217+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
1218 }
1219 }
1220+
1221+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
1222+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
1223 }
1224- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
1225 }
1226
1227+ //#. mark PendingDropOp on the dataverse record by
1228+ // first, deleting the dataverse record from the DATAVERSE_DATASET
1229+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
1230 MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
1231+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
1232+ IMetadataEntity.PENDING_DROP_OP));
1233+
1234+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1235+ bActiveTxn = false;
1236+
1237+ for (JobSpecification jobSpec : jobsToExecute) {
1238+ runJob(hcc, jobSpec);
1239+ }
1240+
1241+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1242+ bActiveTxn = true;
1243+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1244+
1245+ //#. finally, delete the dataverse.
1246+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
1247 if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
1248 activeDefaultDataverse = null;
1249 }
1250+
1251+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1252+ } catch (Exception e) {
1253+ if (bActiveTxn) {
1254+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1255+ }
1256+ throw new AlgebricksException(e);
1257+ } finally {
1258+ releaseWriteLatch();
1259 }
1260 }
1261
1262 private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1263 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1264- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1265- DropStatement stmtDelete = (DropStatement) stmt;
1266- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
1267- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
1268- if (dataverseName == null) {
1269- throw new AlgebricksException(" dataverse not specified ");
1270- }
1271- String datasetName = stmtDelete.getDatasetName().getValue();
1272- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
1273- if (ds == null) {
1274- if (!stmtDelete.getIfExists())
1275- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
1276- + dataverseName + ".");
1277- } else {
1278+
1279+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1280+ boolean bActiveTxn = true;
1281+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1282+ acquireWriteLatch();
1283+
1284+ try {
1285+ DropStatement stmtDelete = (DropStatement) stmt;
1286+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
1287+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
1288+ if (dataverseName == null) {
1289+ throw new AlgebricksException(" dataverse not specified ");
1290+ }
1291+ String datasetName = stmtDelete.getDatasetName().getValue();
1292+
1293+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
1294+ if (ds == null) {
1295+ if (!stmtDelete.getIfExists()) {
1296+ throw new AlgebricksException("There is no dataset with this name " + datasetName
1297+ + " in dataverse " + dataverseName + ".");
1298+ }
1299+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1300+ return;
1301+ }
1302+
1303 if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
1304+
1305+ //#. prepare jobs to drop the datatset and the indexes in NC
1306 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
1307 for (int j = 0; j < indexes.size(); j++) {
1308- if (indexes.get(j).isPrimaryIndex()) {
1309- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
1310- metadataProvider);
1311+ if (indexes.get(j).isSecondaryIndex()) {
1312+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
1313+ indexes.get(j).getIndexName());
1314+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
1315 }
1316 }
1317+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
1318+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
1319+
1320+ //#. mark the existing dataset as PendingDropOp
1321+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
1322+ MetadataManager.INSTANCE.addDataset(
1323+ mdTxnCtx,
1324+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
1325+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
1326+
1327+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1328+ bActiveTxn = false;
1329+
1330+ //#. run the jobs
1331+ for (JobSpecification jobSpec : jobsToExecute) {
1332+ runJob(hcc, jobSpec);
1333+ }
1334+
1335+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1336+ bActiveTxn = true;
1337+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1338 }
1339- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
1340+
1341+ //#. finally, delete the dataset.
1342+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
1343+
1344+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1345+ } catch (Exception e) {
1346+ if (bActiveTxn) {
1347+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1348+ }
1349+ throw new AlgebricksException(e);
1350+ } finally {
1351+ releaseWriteLatch();
1352 }
1353 }
1354
1355 private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1356 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1357- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1358- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
1359- String datasetName = stmtIndexDrop.getDatasetName().getValue();
1360- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
1361- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
1362- if (dataverseName == null) {
1363- throw new AlgebricksException(" dataverse not specified ");
1364+
1365+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1366+ boolean bActiveTxn = true;
1367+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1368+ acquireWriteLatch();
1369+
1370+ try {
1371+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
1372+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
1373+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
1374+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
1375+ if (dataverseName == null) {
1376+ throw new AlgebricksException(" dataverse not specified ");
1377+ }
1378+
1379+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
1380+ if (ds == null) {
1381+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
1382+ + dataverseName);
1383+ }
1384+
1385+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
1386+ String indexName = stmtIndexDrop.getIndexName().getValue();
1387+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
1388+ if (index == null) {
1389+ if (!stmtIndexDrop.getIfExists()) {
1390+ throw new AlgebricksException("There is no index with this name " + indexName + ".");
1391+ }
1392+ } else {
1393+ //#. prepare a job to drop the index in NC.
1394+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
1395+ indexName);
1396+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
1397+
1398+ //#. mark PendingDropOp on the existing index
1399+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
1400+ MetadataManager.INSTANCE.addIndex(
1401+ mdTxnCtx,
1402+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
1403+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
1404+
1405+ //#. commit the existing transaction before calling runJob.
1406+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1407+ bActiveTxn = false;
1408+
1409+ for (JobSpecification jobSpec : jobsToExecute) {
1410+ runJob(hcc, jobSpec);
1411+ }
1412+
1413+ //#. begin a new transaction
1414+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1415+ bActiveTxn = true;
1416+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1417+
1418+ //#. finally, delete the existing index
1419+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
1420+ }
1421+ } else {
1422+ throw new AlgebricksException(datasetName
1423+ + " is an external dataset. Indexes are not maintained for external datasets.");
1424+ }
1425+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1426+
1427+ } catch (Exception e) {
1428+ if (bActiveTxn) {
1429+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1430+ }
1431+ throw new AlgebricksException(e);
1432+
1433+ } finally {
1434+ releaseWriteLatch();
1435 }
1436- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
1437- if (ds == null)
1438- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
1439- + dataverseName);
1440- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
1441- String indexName = stmtIndexDrop.getIndexName().getValue();
1442- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
1443- if (idx == null) {
1444- if (!stmtIndexDrop.getIfExists())
1445- throw new AlgebricksException("There is no index with this name " + indexName + ".");
1446- } else
1447- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
1448- } else {
1449- throw new AlgebricksException(datasetName
1450- + " is an external dataset. Indexes are not maintained for external datasets.");
1451- }
1452 }
1453
1454 private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1455 List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
1456 ACIDException {
1457- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1458- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
1459- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
1460- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
1461- if (dataverseName == null) {
1462- throw new AlgebricksException(" dataverse not specified ");
1463+
1464+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1465+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1466+ acquireWriteLatch();
1467+
1468+ try {
1469+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
1470+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
1471+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
1472+ if (dataverseName == null) {
1473+ throw new AlgebricksException(" dataverse not specified ");
1474+ }
1475+ String typeName = stmtTypeDrop.getTypeName().getValue();
1476+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
1477+ if (dt == null) {
1478+ if (!stmtTypeDrop.getIfExists())
1479+ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
1480+ } else {
1481+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
1482+ }
1483+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1484+ } catch (Exception e) {
1485+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1486+ throw new AlgebricksException(e);
1487+ } finally {
1488+ releaseWriteLatch();
1489 }
1490- String typeName = stmtTypeDrop.getTypeName().getValue();
1491- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
1492- if (dt == null) {
1493- if (!stmtTypeDrop.getIfExists())
1494- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
1495- } else {
1496- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
1497- }
1498 }
1499
1500 private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1501 List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
1502 ACIDException {
1503- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1504- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
1505- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
1506- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
1507- if (ng == null) {
1508- if (!stmtDelete.getIfExists())
1509- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
1510- } else {
1511- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
1512+
1513+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1514+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1515+ acquireWriteLatch();
1516+
1517+ try {
1518+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
1519+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
1520+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
1521+ if (ng == null) {
1522+ if (!stmtDelete.getIfExists())
1523+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
1524+ } else {
1525+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
1526+ }
1527+
1528+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1529+ } catch (Exception e) {
1530+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1531+ throw new AlgebricksException(e);
1532+ } finally {
1533+ releaseWriteLatch();
1534 }
1535 }
1536
1537 private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1538 List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
1539 ACIDException {
1540- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1541- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
1542- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
1543- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
1544- if (dataverse == null) {
1545- throw new AlgebricksException(" dataverse not specified ");
1546+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1547+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1548+ acquireWriteLatch();
1549+
1550+ try {
1551+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
1552+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
1553+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
1554+ if (dataverse == null) {
1555+ throw new AlgebricksException(" dataverse not specified ");
1556+ }
1557+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
1558+ if (dv == null) {
1559+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
1560+ }
1561+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
1562+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
1563+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
1564+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
1565+
1566+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1567+ } catch (Exception e) {
1568+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1569+ throw new AlgebricksException(e);
1570+ } finally {
1571+ releaseWriteLatch();
1572 }
1573- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
1574- if (dv == null) {
1575- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
1576- }
1577- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
1578- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
1579- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
1580- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
1581 }
1582
1583 private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1584 List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
1585 AlgebricksException {
1586- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1587- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
1588- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
1589- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
1590- if (function == null) {
1591- if (!stmtDropFunction.getIfExists())
1592- throw new AlgebricksException("Unknonw function " + signature);
1593- } else {
1594- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
1595+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1596+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1597+ acquireWriteLatch();
1598+
1599+ try {
1600+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
1601+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
1602+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
1603+ if (function == null) {
1604+ if (!stmtDropFunction.getIfExists())
1605+ throw new AlgebricksException("Unknonw function " + signature);
1606+ } else {
1607+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
1608+ }
1609+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1610+ } catch (Exception e) {
1611+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1612+ throw new AlgebricksException(e);
1613+ } finally {
1614+ releaseWriteLatch();
1615 }
1616 }
1617
1618 private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1619 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1620- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1621- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
1622- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
1623- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
1624- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
1625- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
1626
1627- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
1628- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
1629- jobsToExecute.add(job.getJobSpec());
1630- // Also load the dataset's secondary indexes.
1631- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
1632- .getDatasetName().getValue());
1633- for (Index index : datasetIndexes) {
1634- if (!index.isSecondaryIndex()) {
1635- continue;
1636+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1637+ boolean bActiveTxn = true;
1638+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1639+ acquireReadLatch();
1640+
1641+ try {
1642+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
1643+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
1644+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
1645+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
1646+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
1647+ loadStmt.dataIsAlreadySorted());
1648+
1649+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
1650+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
1651+ jobsToExecute.add(job.getJobSpec());
1652+ // Also load the dataset's secondary indexes.
1653+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
1654+ .getDatasetName().getValue());
1655+ for (Index index : datasetIndexes) {
1656+ if (!index.isSecondaryIndex()) {
1657+ continue;
1658+ }
1659+ // Create CompiledCreateIndexStatement from metadata entity 'index'.
1660+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
1661+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
1662+ index.getIndexType());
1663+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
1664 }
1665- // Create CompiledCreateIndexStatement from metadata entity 'index'.
1666- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
1667- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
1668- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
1669+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1670+ bActiveTxn = false;
1671+
1672+ for (JobSpecification jobspec : jobsToExecute) {
1673+ runJob(hcc, jobspec);
1674+ }
1675+ } catch (Exception e) {
1676+ if (bActiveTxn) {
1677+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1678+ }
1679+ throw new AlgebricksException(e);
1680+ } finally {
1681+ releaseReadLatch();
1682 }
1683 }
1684
1685 private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1686 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1687- metadataProvider.setWriteTransaction(true);
1688- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
1689- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
1690- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
1691- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
1692- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
1693
1694- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
1695- if (compiled.first != null) {
1696- jobsToExecute.add(compiled.first);
1697+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1698+ boolean bActiveTxn = true;
1699+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1700+ acquireReadLatch();
1701+
1702+ try {
1703+ metadataProvider.setWriteTransaction(true);
1704+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
1705+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
1706+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
1707+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
1708+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
1709+
1710+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
1711+ clfrqs);
1712+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1713+ bActiveTxn = false;
1714+ if (compiled.first != null) {
1715+ runJob(hcc, compiled.first);
1716+ }
1717+ } catch (Exception e) {
1718+ if (bActiveTxn) {
1719+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1720+ }
1721+ throw new AlgebricksException(e);
1722+ } finally {
1723+ releaseReadLatch();
1724 }
1725 }
1726
1727 private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1728 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1729- metadataProvider.setWriteTransaction(true);
1730- InsertStatement stmtInsert = (InsertStatement) stmt;
1731- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
1732- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
1733- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
1734- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
1735- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
1736- if (compiled.first != null) {
1737- jobsToExecute.add(compiled.first);
1738+
1739+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1740+ boolean bActiveTxn = true;
1741+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1742+ acquireReadLatch();
1743+
1744+ try {
1745+ metadataProvider.setWriteTransaction(true);
1746+ InsertStatement stmtInsert = (InsertStatement) stmt;
1747+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
1748+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
1749+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
1750+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
1751+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
1752+ clfrqs);
1753+
1754+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1755+ bActiveTxn = false;
1756+
1757+ if (compiled.first != null) {
1758+ runJob(hcc, compiled.first);
1759+ }
1760+
1761+ } catch (Exception e) {
1762+ if (bActiveTxn) {
1763+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1764+ }
1765+ throw new AlgebricksException(e);
1766+ } finally {
1767+ releaseReadLatch();
1768 }
1769 }
1770
1771 private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1772 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1773- metadataProvider.setWriteTransaction(true);
1774- DeleteStatement stmtDelete = (DeleteStatement) stmt;
1775- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
1776- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
1777- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
1778- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
1779- stmtDelete.getVarCounter(), metadataProvider);
1780- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
1781- if (compiled.first != null) {
1782- jobsToExecute.add(compiled.first);
1783+
1784+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1785+ boolean bActiveTxn = true;
1786+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1787+ acquireReadLatch();
1788+
1789+ try {
1790+ metadataProvider.setWriteTransaction(true);
1791+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
1792+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
1793+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
1794+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
1795+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
1796+ stmtDelete.getVarCounter(), metadataProvider);
1797+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
1798+ clfrqs);
1799+
1800+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1801+ bActiveTxn = false;
1802+
1803+ if (compiled.first != null) {
1804+ runJob(hcc, compiled.first);
1805+ }
1806+
1807+ } catch (Exception e) {
1808+ if (bActiveTxn) {
1809+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1810+ }
1811+ throw new AlgebricksException(e);
1812+ } finally {
1813+ releaseReadLatch();
1814 }
1815 }
1816
1817@@ -704,46 +1106,109 @@
1818
1819 private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1820 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1821- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
1822- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
1823- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
1824
1825- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
1826- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
1827+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1828+ boolean bActiveTxn = true;
1829+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1830+ acquireReadLatch();
1831
1832- Dataset dataset;
1833- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
1834- .getDatasetName().getValue());
1835- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
1836- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
1837- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
1838+ try {
1839+ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
1840+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
1841+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
1842+
1843+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
1844+ .getValue(), bfs.getQuery(), bfs.getVarCounter());
1845+
1846+ Dataset dataset;
1847+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
1848+ .getDatasetName().getValue());
1849+ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
1850+ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
1851+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
1852+ + " is not a feed dataset");
1853+ }
1854+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
1855+ cbfs.setQuery(bfs.getQuery());
1856+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
1857+
1858+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1859+ bActiveTxn = false;
1860+
1861+ if (compiled.first != null) {
1862+ runJob(hcc, compiled.first);
1863+ }
1864+
1865+ } catch (Exception e) {
1866+ if (bActiveTxn) {
1867+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1868+ }
1869+ throw new AlgebricksException(e);
1870+ } finally {
1871+ releaseReadLatch();
1872 }
1873- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
1874- cbfs.setQuery(bfs.getQuery());
1875- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
1876- if (compiled.first != null) {
1877- jobsToExecute.add(compiled.first);
1878- }
1879 }
1880
1881 private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1882 IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
1883- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
1884- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
1885- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
1886- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
1887- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
1888- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
1889+
1890+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1891+ boolean bActiveTxn = true;
1892+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1893+ acquireReadLatch();
1894+
1895+ try {
1896+ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
1897+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
1898+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
1899+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
1900+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
1901+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
1902+
1903+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1904+ bActiveTxn = false;
1905+
1906+ runJob(hcc, jobSpec);
1907+
1908+ } catch (Exception e) {
1909+ if (bActiveTxn) {
1910+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1911+ }
1912+ throw new AlgebricksException(e);
1913+ } finally {
1914+ releaseReadLatch();
1915+ }
1916 }
1917
1918 private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
1919 List<JobSpecification> jobsToExecute) throws Exception {
1920- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
1921- if (compiled.first != null) {
1922- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
1923- jobsToExecute.add(compiled.first);
1924+
1925+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1926+ boolean bActiveTxn = true;
1927+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1928+ acquireReadLatch();
1929+
1930+ try {
1931+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
1932+
1933+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
1934+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1935+ bActiveTxn = false;
1936+
1937+ if (compiled.first != null) {
1938+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
1939+ runJob(hcc, compiled.first);
1940+ }
1941+
1942+ return queryResult;
1943+ } catch (Exception e) {
1944+ if (bActiveTxn) {
1945+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1946+ }
1947+ throw new AlgebricksException(e);
1948+ } finally {
1949+ releaseReadLatch();
1950 }
1951- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
1952 }
1953
1954 private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
1955@@ -768,20 +1233,32 @@
1956 private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
1957 List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
1958 ACIDException {
1959- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
1960- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
1961- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
1962- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
1963- if (ng != null) {
1964- if (!stmtCreateNodegroup.getIfNotExists())
1965- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
1966- } else {
1967- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
1968- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
1969- for (Identifier id : ncIdentifiers) {
1970- ncNames.add(id.getValue());
1971+
1972+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
1973+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
1974+ acquireWriteLatch();
1975+
1976+ try {
1977+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
1978+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
1979+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
1980+ if (ng != null) {
1981+ if (!stmtCreateNodegroup.getIfNotExists())
1982+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
1983+ } else {
1984+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
1985+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
1986+ for (Identifier id : ncIdentifiers) {
1987+ ncNames.add(id.getValue());
1988+ }
1989+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
1990 }
1991- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
1992+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
1993+ } catch (Exception e) {
1994+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
1995+ throw new AlgebricksException(e);
1996+ } finally {
1997+ releaseWriteLatch();
1998 }
1999 }
2000
2001@@ -791,10 +1268,37 @@
2002
2003 private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
2004 String indexName, AqlMetadataProvider metadataProvider) throws Exception {
2005+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
2006+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
2007+
2008+ //#. mark PendingDropOp on the existing index
2009+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
2010+ MetadataManager.INSTANCE.addIndex(
2011+ mdTxnCtx,
2012+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
2013+ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
2014 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
2015- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
2016- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
2017- indexName);
2018+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
2019+
2020+ //#. commit the existing transaction before calling runJob.
2021+ // the caller should begin the transaction before calling this function.
2022+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2023+
2024+ try {
2025+ runJob(hcc, jobSpec);
2026+ } catch (Exception e) {
2027+ //need to create the mdTxnCtx to be aborted by caller properly
2028+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2029+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2030+ throw e;
2031+ }
2032+
2033+ //#. begin a new transaction
2034+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2035+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2036+
2037+ //#. finally, delete the existing index
2038+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
2039 }
2040
2041 private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
2042@@ -803,10 +1307,32 @@
2043 CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
2044 Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
2045 if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
2046- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
2047- for (JobSpecification spec : jobSpecs)
2048- runJob(hcc, spec);
2049+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
2050+
2051+ //#. mark PendingDropOp on the existing dataset
2052+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
2053+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
2054+ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
2055+
2056+ //#. commit the transaction
2057+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2058+
2059+ //#. run the job
2060+ try {
2061+ runJob(hcc, jobSpec);
2062+ } catch (Exception e) {
2063+ //need to create the mdTxnCtx to be aborted by caller properly
2064+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2065+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2066+ throw e;
2067+ }
2068+
2069+ //#. start a new transaction
2070+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2071+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2072 }
2073+
2074+ //#. finally, delete the existing dataset.
2075 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
2076 }
2077
2078@@ -831,4 +1357,20 @@
2079 }
2080 return format;
2081 }
2082+
2083+ private void acquireWriteLatch() {
2084+ cacheLatch.writeLock().lock();
2085+ }
2086+
2087+ private void releaseWriteLatch() {
2088+ cacheLatch.writeLock().unlock();
2089+ }
2090+
2091+ private void acquireReadLatch() {
2092+ cacheLatch.readLock().lock();
2093+ }
2094+
2095+ private void releaseReadLatch() {
2096+ cacheLatch.readLock().unlock();
2097+ }
2098 }