Merge branch 'master' into westmann/doc
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e64f68f..d4d7ac6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
@@ -99,10 +101,9 @@
ioManager = ncApplicationContext.getRootContext().getIOManager();
bufferCache = new BufferCache(ioManager, allocator, prs, pcp, fileMapManager,
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages(),
- storageProperties.getBufferCacheMaxOpenFiles());
+ storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
indexLifecycleManager = new IndexLifecycleManager(storageProperties.getMemoryComponentGlobalBudget());
-
lsmIOScheduler = SynchronousScheduler.INSTANCE;
mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
lsmBTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMBTreeIOOperationCallbackFactory.INSTANCE);
@@ -120,6 +121,14 @@
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider);
isShuttingdown = false;
+
+ // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) bufferCache);
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) indexLifecycleManager);
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
}
public boolean isShuttingdown() {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 5d4f6ec..9b83f00 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -13,6 +13,7 @@
import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryStatusAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -37,12 +38,12 @@
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
this.appCtx = ccAppCtx;
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix cluster controller");
}
-
+ appCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
AsterixAppContextInfo.initialize(appCtx);
-
proxy = AsterixStateProxy.registerRemoteObject();
appCtx.setDistributedState(proxy);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4569088..32bfeb5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -1,11 +1,15 @@
package edu.uci.ics.asterix.hyracks.bootstrap;
+import java.io.File;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.api.common.AsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -19,6 +23,7 @@
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
@@ -32,17 +37,19 @@
@Override
public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ ncAppCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
ncApplicationContext = ncAppCtx;
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting Asterix node controller: " + nodeId);
+ LOGGER.info("Starting Asterix node controller TAKE NOTE: " + nodeId);
}
+ JVMShutdownHook sHook = new JVMShutdownHook(this);
+ Runtime.getRuntime().addShutdownHook(sHook);
+
runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
runtimeContext.initialize();
ncApplicationContext.setApplicationObject(runtimeContext);
- JVMShutdownHook sHook = new JVMShutdownHook(this);
- Runtime.getRuntime().addShutdownHook(sHook);
// #. recover if the system is corrupted by checking system state.
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -78,6 +85,8 @@
if (isMetadataNode) {
MetadataBootstrap.stopUniverse();
}
+
+ LifeCycleComponentManager.INSTANCE.stopAll(false);
runtimeContext.deinitialize();
} else {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -89,7 +98,8 @@
@Override
public void notifyStartupComplete() throws Exception {
IAsterixStateProxy proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
- AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider)runtimeContext).getMetadataProperties();
+ AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+ .getMetadataProperties();
if (systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -113,11 +123,29 @@
}
MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
MetadataManager.INSTANCE.init();
- MetadataBootstrap.startUniverse( ((IAsterixPropertiesProvider)runtimeContext), ncApplicationContext,
+ MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
systemState == SystemState.NEW_UNIVERSE);
MetadataBootstrap.startDDLRecovery();
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting lifecycle components");
+ }
+
+ Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
+ String key = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+ String value = metadataProperties.getCoredumpPath(nodeId);
+ lifecycleMgmtConfiguration.put(key, value);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Coredump directory for NC is: " + value);
+ }
+ LifeCycleComponentManager.INSTANCE.configure(lifecycleMgmtConfiguration);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Configured:" + LifeCycleComponentManager.INSTANCE);
+ }
+
+ LifeCycleComponentManager.INSTANCE.startAll();
+
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
recoveryMgr.checkpoint(true);
diff --git a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..b39fced 100644
--- a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
@@ -5,21 +5,21 @@
<configuration>
- <property>
- <name>mapred.job.tracker</name>
- <value>localhost:29007</value>
- </property>
- <property>
- <name>mapred.tasktracker.map.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.tasktracker.reduce.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
- </property>
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
+ </property>
</configuration>
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
index 653ee6c..527a0e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
@@ -8,4 +8,5 @@
use dataverse test;
for $x in dataset('TextDataset')
+order by $x.line
return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
index 8af2f5f..59425b1 100644
--- a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
@@ -1,4 +1,4 @@
+{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
+{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
{ "line": "The ASTERIX project is developing new technologies for ingesting, storing, managing, indexing, querying, analyzing, and subscribing to vast quantities of semi-structured information" }
{ "line": "The project is combining ideas from three distinct areas semi-structured data, parallel databases, and data-intensive computing to create a next-generation, open source software platform that scales by running on large, shared-nothing commodity computing clusters" }
-{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
-{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
new file mode 100644
index 0000000..14975ff
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class AsterixThreadExecutor implements Executor {
+ public final static AsterixThreadExecutor INSTANCE = new AsterixThreadExecutor();
+ private final Executor executor = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
+
+ private AsterixThreadExecutor() {
+
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
new file mode 100644
index 0000000..7e4735f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.ThreadFactory;
+
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+
+public class AsterixThreadFactory implements ThreadFactory {
+
+ public final static AsterixThreadFactory INSTANCE = new AsterixThreadFactory();
+
+ private AsterixThreadFactory() {
+
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t;
+ if ((r instanceof Thread)) {
+ t = (Thread) r;
+ } else {
+ t = new Thread(r);
+ }
+ t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+ return t;
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
index 6d47e78..6b6cded 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
@@ -24,5 +24,9 @@
public Set<String> getNodeNames() {
return accessor.getNodeNames();
}
+
+ public String getCoredumpPath(String nodeId){
+ return accessor.getCoredumpPath(nodeId);
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
index 7b2f2a6..d623ae5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
@@ -16,6 +16,7 @@
import javax.xml.bind.Unmarshaller;
import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
import edu.uci.ics.asterix.common.configuration.Property;
import edu.uci.ics.asterix.common.configuration.Store;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -26,6 +27,7 @@
private final String metadataNodeName;
private final Set<String> nodeNames;
private final Map<String, String[]> stores;
+ private final Map<String, String> coredumpConfig;
private final Map<String, Property> asterixConfigurationParams;
public AsterixPropertiesAccessor() throws AsterixException {
@@ -64,6 +66,11 @@
for (Property p : asterixConfiguration.getProperty()) {
asterixConfigurationParams.put(p.getName(), p);
}
+ coredumpConfig = new HashMap<String, String>();
+ for (Coredump cd : asterixConfiguration.getCoredump()) {
+ coredumpConfig.put(cd.getNcId(), cd.getCoredumpPath());
+ }
+
}
public String getMetadataNodeName() {
@@ -82,6 +89,10 @@
return nodeNames;
}
+ public String getCoredumpPath(String nodeId) {
+ return coredumpConfig.get(nodeId);
+ }
+
public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
Property p = asterixConfigurationParams.get(property);
if (p == null) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 9470d17..c90422c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -50,12 +50,10 @@
ACIDException;
/**
- * @param logicalLogLocator
+ * @param lsnValue
* TODO
* @param logicalLogLocator
* TODO
- * @param PhysicalLogLocator
- * specifies the location of the log record to be read
* @throws ACIDException
*/
public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index a06cc75..a7f2984 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -1,7 +1,6 @@
package edu.uci.ics.asterix.common.transactions;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -42,6 +41,8 @@
public void setTransactionType(TransactionType transactionType);
+ public String prettyPrint();
+
public static final long INVALID_TIME = -1l; // used for showing a
// transaction is not waiting.
public static final int ACTIVE_STATUS = 0;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 2bfc00d..e57cc64 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -120,5 +120,4 @@
*/
public ITransactionSubsystem getTransactionProvider();
-
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
index 4dc943c..89816aa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
@@ -105,7 +105,7 @@
public String getLogDirKey() {
return logDirKey;
}
-
+
public int getDiskSectorSize() {
return diskSectorSize;
}
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index f53fb4b..5aefdbd 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -7,6 +7,7 @@
<xs:element name="metadataNode" type="xs:string" />
+ <xs:element name="coredumpPath" type="xs:string" />
<xs:element name="storeDirs" type="xs:string" />
<xs:element name="ncId" type="xs:string" />
<xs:element name="name" type="xs:string" />
@@ -23,6 +24,15 @@
</xs:complexType>
</xs:element>
+ <xs:element name="coredump">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="mg:ncId" />
+ <xs:element ref="mg:coredumpPath" />
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
<xs:element name="property">
<xs:complexType>
<xs:sequence>
@@ -39,6 +49,7 @@
<xs:sequence>
<xs:element ref="mg:metadataNode" minOccurs="0"/>
<xs:element ref="mg:store" maxOccurs="unbounded" />
+ <xs:element ref="mg:coredump" maxOccurs="unbounded" />
<xs:element ref="mg:property" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index e702ef3..63a791b 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,7 +18,6 @@
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 842cd67..aff6967 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -131,7 +131,8 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>jdom</groupId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..8c8880a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,9 +23,10 @@
* A factory class for creating the @see {CNNFeedAdapter}.
*/
public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
cnnFeedAdapter.configure(configuration);
return cnnFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..c267658 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,22 +14,79 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
/**
* A factory class for creating an instance of HDFSAdapter
*/
+@SuppressWarnings("deprecation")
public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static transient String SCHEDULER = "hdfs-scheduler";
+
+ public static final String KEY_HDFS_URL = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+ private transient AlgebricksPartitionConstraint clusterLocations;
+ private String[] readSchedule;
+ private boolean executed[];
+ private InputSplitsFactory inputSplitsFactory;
+ private ConfFactory confFactory;
+ private boolean setup = false;
+
+ private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+ private static Map<String, String> initInputFormatMap() {
+ Map<String, String> formatClassNames = new HashMap<String, String>();
+ formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+ return formatClassNames;
+ }
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
- HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ if (!setup) {
+ /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+ configureJobConf(configuration);
+ JobConf conf = configureJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+ Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+ readSchedule = scheduler.getLocationConstraints(inputSplits);
+ executed = new boolean[readSchedule.length];
+ Arrays.fill(executed, false);
+
+ setup = true;
+ }
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
hdfsAdapter.configure(configuration);
return hdfsAdapter;
}
@@ -39,4 +96,15 @@
return HDFS_ADAPTER_NAME;
}
+ private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+ conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+ conf.set("mapred.input.format.class",
+ (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+ return conf;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..ba6ade5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,20 +14,92 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
/**
* A factory class for creating an instance of HiveAdapter
*/
+@SuppressWarnings("deprecation")
public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final String HDFS_ADAPTER_NAME = "hdfs";
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static transient String SCHEDULER = "hdfs-scheduler";
+
+ public static final String KEY_HDFS_URL = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+ public static final String KEY_FORMAT = "format";
+ public static final String KEY_PARSER_FACTORY = "parser";
+ public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+ public static final String FORMAT_ADM = "adm";
+
+ public static final String HIVE_DATABASE = "database";
+ public static final String HIVE_TABLE = "table";
+ public static final String HIVE_HOME = "hive-home";
+ public static final String HIVE_METASTORE_URI = "metastore-uri";
+ public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+ public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+
+ private String[] readSchedule;
+ private boolean executed[];
+ private InputSplitsFactory inputSplitsFactory;
+ private ConfFactory confFactory;
+ private transient AlgebricksPartitionConstraint clusterLocations;
+ private boolean setup = false;
+
+ private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+ private static Map<String, String> initInputFormatMap() {
+ Map<String, String> formatClassNames = new HashMap<String, String>();
+ formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+ return formatClassNames;
+ }
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
- HiveAdapter hiveAdapter = new HiveAdapter(type);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ if (!setup) {
+ /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+ configureJobConf(configuration);
+ JobConf conf = configureJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+ Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+ readSchedule = scheduler.getLocationConstraints(inputSplits);
+ executed = new boolean[readSchedule.length];
+ Arrays.fill(executed, false);
+
+ setup = true;
+ }
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
hiveAdapter.configure(configuration);
return hiveAdapter;
}
@@ -36,4 +108,37 @@
public String getName() {
return "hive";
}
+
+ private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ JobConf conf = new JobConf();
+
+ /** configure hive */
+ String database = (String) configuration.get(HIVE_DATABASE);
+ String tablePath = null;
+ if (database == null) {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
+ } else {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
+ + configuration.get(HIVE_TABLE);
+ }
+ configuration.put(HDFSAdapter.KEY_PATH, tablePath);
+ if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+ throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+ }
+
+ if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
+ .get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
+ throw new IllegalArgumentException("file input format"
+ + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
+ }
+
+ /** configure hdfs */
+ conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+ conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+ conf.set("mapred.input.format.class",
+ (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+ return conf;
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
index 45fd6cf..697c8ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
@@ -14,12 +14,14 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.io.Serializable;
+
/**
* Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
* Acts as a marker interface indicating that the implementation provides functionality
* for creating an adapter.
*/
-public interface IAdapterFactory {
+public interface IAdapterFactory extends Serializable {
/**
* Returns the display name corresponding to the Adapter type that is created by the factory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 093a3dd..e8d120a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -38,6 +38,6 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception;
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
index 0f9978e..84a5ca8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
@@ -33,6 +33,6 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..659fd23 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,11 @@
* an NC.
*/
public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
fsAdapter.configure(configuration);
return fsAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..e63be17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -25,11 +25,11 @@
* via pull-based Twitter API.
*/
public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
twitterAdapter.configure(configuration);
return twitterAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..3cd22e8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -24,11 +24,11 @@
* RSSFeedAdapter provides the functionality of fetching an RSS based feed.
*/
public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
rssFeedAdapter.configure(configuration);
return rssFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index fb4cc99..f9f72cf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -19,8 +19,6 @@
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -38,61 +36,24 @@
private static final long serialVersionUID = 1L;
- private final String adapterFactory;
- private final Map<String, String> adapterConfiguration;
+ private final Map<String, Object> adapterConfiguration;
private final IAType atype;
private IGenericDatasetAdapterFactory datasourceAdapterFactory;
- public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
- IAType atype, RecordDescriptor rDesc) {
+ public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
+ RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
- this.adapterFactory = adapter;
this.adapterConfiguration = arguments;
this.atype = atype;
- }
-
- @Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
-
- /*
- Comment: The following code is commented out. This is because constraints are being set at compile time so that they can
- be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
- Once it is there, we will uncomment the following code.
-
- AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
- switch (constraint.getPartitionConstraintType()) {
- case ABSOLUTE:
- String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
- for (int i = 0; i < locations.length; ++i) {
- constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
- new ConstantExpression(locations[i])));
- }
- constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
- new ConstantExpression(locations.length)));
-
- break;
- case COUNT:
- constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
- new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
- break;
- default:
- throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
- + " not supported");
-
- }*/
-
+ this.datasourceAdapterFactory = dataSourceAdapterFactory;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- try {
- datasourceAdapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactory).newInstance();
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed", e);
- }
+
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..f07168a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -35,52 +35,46 @@
* Operator responsible for ingesting data from an external source. This
* operator uses a (configurable) adapter associated with the feed dataset.
*/
-public class FeedIntakeOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private final String adapterFactoryClassName;
- private final Map<String, String> adapterConfiguration;
- private final IAType atype;
- private final FeedId feedId;
+ private final String adapterFactoryClassName;
+ private final Map<String, Object> adapterConfiguration;
+ private final IAType atype;
+ private final FeedId feedId;
+ private final IAdapterFactory datasourceAdapterFactory;
- private transient IAdapterFactory datasourceAdapterFactory;
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+ Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc,
+ IAdapterFactory datasourceAdapterFactory) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapterFactoryClassName = adapter;
+ this.adapterConfiguration = arguments;
+ this.atype = atype;
+ this.feedId = feedId;
+ this.datasourceAdapterFactory = datasourceAdapterFactory;
+ }
- public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
- String adapter, Map<String, String> arguments, ARecordType atype,
- RecordDescriptor rDesc) {
- super(spec, 1, 1);
- recordDescriptors[0] = rDesc;
- this.adapterFactoryClassName = adapter;
- this.adapterConfiguration = arguments;
- this.atype = atype;
- this.feedId = feedId;
- }
-
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition,
- int nPartitions) throws HyracksDataException {
- ITypedDatasourceAdapter adapter;
- try {
- datasourceAdapterFactory = (IAdapterFactory) Class.forName(
- adapterFactoryClassName).newInstance();
- if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration, atype);
- } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration);
- } else {
- throw new IllegalStateException(
- " Unknown adapter factory type for "
- + adapterFactoryClassName);
- }
- adapter.initialize(ctx);
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed",
- e);
- }
- return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
- }
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ ITypedDatasourceAdapter adapter;
+ try {
+ if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration, atype);
+ } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration);
+ } else {
+ throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+ }
+ adapter.initialize(ctx);
+ } catch (Exception e) {
+ throw new HyracksDataException("initialization of adapter failed", e);
+ }
+ return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index d0dbb98..fd40b03 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
@@ -51,7 +52,7 @@
public void open() throws HyracksDataException {
if (adapter instanceof IManagedFeedAdapter) {
feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
- feedInboxMonitor.start();
+ AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
feedManager.registerFeedMsgQueue(feedId, inbox);
}
writer.open();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 440ee8c..23e545d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -36,7 +36,7 @@
private static final long serialVersionUID = 1L;
- protected Map<String, String> configuration;
+ protected Map<String, Object> configuration;
protected transient AlgebricksPartitionConstraint partitionConstraint;
protected IAType atype;
protected IHyracksTaskContext ctx;
@@ -51,15 +51,15 @@
typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
}
- protected static final Map<String, String> formatToParserFactoryMap = initializeFormatParserFactoryMap();
+ protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
public static final String KEY_FORMAT = "format";
public static final String KEY_PARSER_FACTORY = "parser";
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_ADM = "adm";
- private static Map<String, String> initializeFormatParserFactoryMap() {
- Map<String, String> map = new HashMap<String, String>();
+ private static Map<String, Object> initializeFormatParserFactoryMap() {
+ Map<String, Object> map = new HashMap<String, Object>();
map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
return map;
@@ -77,7 +77,7 @@
* @param attribute
* The attribute whose value needs to be obtained.
*/
- public String getAdapterProperty(String attribute) {
+ public Object getAdapterProperty(String attribute) {
return configuration.get(attribute);
}
@@ -86,7 +86,7 @@
*
* @return A Map<String,String> instance representing the adapter configuration.
*/
- public Map<String, String> getConfiguration() {
+ public Map<String, Object> getConfiguration() {
return configuration;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..8976f7a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -70,9 +70,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..8ab252d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -69,7 +69,7 @@
public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
@Override
- public abstract void configure(Map<String, String> arguments) throws Exception;
+ public abstract void configure(Map<String, Object> arguments) throws Exception;
@Override
public abstract AdapterType getAdapterType();
@@ -82,14 +82,14 @@
}
protected void configureFormat() throws Exception {
- String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
+ String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
if (parserFactoryClassname == null) {
- String specifiedFormat = configuration.get(KEY_FORMAT);
+ String specifiedFormat = (String) configuration.get(KEY_FORMAT);
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
} else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
- } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
} else {
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e05b2f..02eacf5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -14,18 +14,9 @@
*/
package edu.uci.ics.asterix.external.dataset.adapter;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
@@ -37,14 +28,9 @@
import org.apache.hadoop.mapred.TextInputFormat;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
/**
* Provides functionality for fetching external data stored in an HDFS instance.
@@ -53,118 +39,29 @@
public class HDFSAdapter extends FileSystemBasedAdapter {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
- public static final String KEY_HDFS_URL = "hdfs";
- public static final String KEY_INPUT_FORMAT = "input-format";
- public static final String INPUT_FORMAT_TEXT = "text-input-format";
- public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
- private Object[] inputSplits;
+ private transient String[] readSchedule;
+ private transient boolean executed[];
+ private transient InputSplit[] inputSplits;
private transient JobConf conf;
- private InputSplitsProxy inputSplitsProxy;
- private static final Map<String, String> formatClassNames = initInputFormatMap();
+ private transient AlgebricksPartitionConstraint clusterLocations;
- private static Map<String, String> initInputFormatMap() {
- Map<String, String> formatClassNames = new HashMap<String, String>();
- formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
- formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
- return formatClassNames;
- }
+ private transient String nodeName;
- public HDFSAdapter(IAType atype) {
+ public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+ AlgebricksPartitionConstraint clusterLocations) {
super(atype);
+ this.readSchedule = readSchedule;
+ this.executed = executed;
+ this.inputSplits = inputSplits;
+ this.conf = conf;
+ this.clusterLocations = clusterLocations;
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
- configuration = arguments;
+ public void configure(Map<String, Object> arguments) throws Exception {
+ this.configuration = arguments;
configureFormat();
- configureJobConf();
- configureSplits();
- }
-
- private void configureSplits() throws IOException {
- if (inputSplitsProxy == null) {
- inputSplits = conf.getInputFormat().getSplits(conf, 0);
- }
- inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
- }
-
- private void configurePartitionConstraint() throws Exception {
- List<String> locations = new ArrayList<String>();
- Random random = new Random();
- boolean couldConfigureLocationConstraints = false;
- try {
- Map<String, Set<String>> nodeControllers = AsterixRuntimeUtil.getNodeControllerMap();
- for (Object inputSplit : inputSplits) {
- String[] dataNodeLocations = ((InputSplit) inputSplit).getLocations();
- if (dataNodeLocations == null || dataNodeLocations.length == 0) {
- throw new IllegalArgumentException("No datanode locations found: check hdfs path");
- }
-
- // loop over all replicas until a split location coincides
- // with an asterix datanode location
- for (String datanodeLocation : dataNodeLocations) {
- Set<String> nodeControllersAtLocation = null;
- try {
- nodeControllersAtLocation = nodeControllers.get(AsterixRuntimeUtil
- .getIPAddress(datanodeLocation));
- } catch (UnknownHostException uhe) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "Unknown host :" + datanodeLocation);
- }
- continue;
- }
- if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "No node controller found at " + datanodeLocation
- + " will look at replica location");
- }
- couldConfigureLocationConstraints = false;
- } else {
- int locationIndex = random.nextInt(nodeControllersAtLocation.size());
- String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
- locations.add(chosenLocation);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :" + chosenLocation);
- }
- couldConfigureLocationConstraints = true;
- break;
- }
- }
-
- /* none of the replica locations coincides with an Asterix
- node controller location.
- */
- if (!couldConfigureLocationConstraints) {
- List<String> allNodeControllers = AsterixRuntimeUtil.getAllNodeControllers();
- int locationIndex = random.nextInt(allNodeControllers.size());
- String chosenLocation = allNodeControllers.get(locationIndex);
- locations.add(chosenLocation);
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "No local node controller found to process split : " + inputSplit
- + " will be processed by a remote node controller:" + chosenLocation);
- }
- }
- }
- partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "Encountered exception :" + e + " using count constraints");
- }
- partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
- }
- }
-
- private JobConf configureJobConf() throws Exception {
- conf = new JobConf();
- conf.set("fs.default.name", configuration.get(KEY_HDFS_URL).trim());
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set("mapred.input.dir", configuration.get(KEY_PATH).trim());
- conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()));
- return conf;
}
public AdapterType getAdapterType() {
@@ -174,7 +71,7 @@
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
- inputSplits = inputSplitsProxy.toInputSplits(conf);
+ this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
}
private Reporter getReporter() {
@@ -215,98 +112,124 @@
return reporter;
}
- @SuppressWarnings("unchecked")
@Override
public InputStream getInputStream(int partition) throws IOException {
- try {
- InputStream inputStream;
- if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
- inputStream = new HDFSStream(reader, ctx);
- } else {
- try {
+
+ return new InputStream() {
+
+ private RecordReader<Object, Text> reader;
+ private Object key;
+ private Text value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private Text pendingValue = null;
+ private int currentSplitIndex = 0;
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = (Text) reader.createValue();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
+ }
+
+ int numBytes = 0;
+ if (pendingValue != null) {
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+ buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+ numBytes += pendingValue.getLength() + 1;
+ pendingValue = null;
+ }
+
+ while (numBytes < len) {
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+ int sizeOfNextTuple = value.getLength() + 1;
+ if (numBytes + sizeOfNextTuple > len) {
+ // cannot add tuple to current buffer
+ // but the reader has moved pass the fetched tuple
+ // we need to store this for a subsequent read call.
+ // and return this then.
+ pendingValue = value;
+ break;
+ } else {
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ }
+ return numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ private RecordReader getRecordReader(int slitIndex) throws IOException {
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ } else {
TextInputFormat format = (TextInputFormat) conf.getInputFormat();
RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
- inputStream = new HDFSStream(reader, ctx);
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
}
}
- return inputStream;
- } catch (Exception e) {
- throw new IOException(e);
- }
+
+ };
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- if (partitionConstraint == null) {
- configurePartitionConstraint();
- }
- return partitionConstraint;
- }
-
-}
-
-class HDFSStream extends InputStream {
-
- private RecordReader<Object, Text> reader;
- private final Object key;
- private final Text value;
- private boolean hasMore = false;
- private static final int EOL = "\n".getBytes()[0];
- private Text pendingValue = null;
-
- public HDFSStream(RecordReader<Object, Text> reader, IHyracksTaskContext ctx) throws Exception {
- this.reader = reader;
- key = reader.createKey();
- try {
- value = (Text) reader.createValue();
- } catch (ClassCastException cce) {
- throw new Exception("value is not of type org.apache.hadoop.io.Text"
- + " type not supported in sequence file format", cce);
- }
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- int numBytes = 0;
- if (pendingValue != null) {
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
- pendingValue = null;
- }
-
- while (numBytes < len) {
- hasMore = reader.next(key, value);
- if (!hasMore) {
- return (numBytes == 0) ? -1 : numBytes;
- }
- int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple > len) {
- // cannot add tuple to current buffer
- // but the reader has moved pass the fetched tuple
- // we need to store this for a subsequent read call.
- // and return this then.
- pendingValue = value;
- break;
- } else {
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- }
- }
- return numBytes;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
+ return clusterLocations;
}
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..5e48834 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,9 @@
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,6 +27,7 @@
/**
* Provides the functionality of fetching data in form of ADM records from a Hive dataset.
*/
+@SuppressWarnings("deprecation")
public class HiveAdapter extends AbstractDatasourceAdapter {
private static final long serialVersionUID = 1L;
@@ -37,8 +41,9 @@
private HDFSAdapter hdfsAdapter;
- public HiveAdapter(IAType atype) {
- this.hdfsAdapter = new HDFSAdapter(atype);
+ public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+ AlgebricksPartitionConstraint clusterLocations) {
+ this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
this.atype = atype;
}
@@ -48,33 +53,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
- configuration = arguments;
- configureHadoopAdapter();
- }
-
- private void configureHadoopAdapter() throws Exception {
- String database = configuration.get(HIVE_DATABASE);
- String tablePath = null;
- if (database == null) {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
- } else {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
- + configuration.get(HIVE_TABLE);
- }
- configuration.put(HDFSAdapter.KEY_PATH, tablePath);
- if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
- throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
- }
-
- if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
- .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
- throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
- + " is not supported");
- }
-
- hdfsAdapter = new HDFSAdapter(atype);
- hdfsAdapter.configure(configuration);
+ public void configure(Map<String, Object> arguments) throws Exception {
+ this.configuration = arguments;
+ this.hdfsAdapter.configure(arguments);
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index b0dc32f..031f34f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -70,7 +70,7 @@
* @return String the value corresponding to the configuration parameter
* represented by the key- attributeKey.
*/
- public String getAdapterProperty(String propertyKey);
+ public Object getAdapterProperty(String propertyKey);
/**
* Configures the IDatasourceAdapter instance.
@@ -100,7 +100,7 @@
* providing all arguments as a set of (key,value) pairs. These
* arguments are put into the metadata.
*/
- public void configure(Map<String, String> arguments) throws Exception;
+ public void configure(Map<String, Object> arguments) throws Exception;
/**
* Returns a list of partition constraints. A partition constraint can be a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index ef39d45..9abc92a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -44,9 +44,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
this.configuration = arguments;
- String[] splits = arguments.get(KEY_PATH).split(",");
+ String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
configureFileSplits(splits);
configureFormat();
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..66d9f98 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -31,9 +31,8 @@
*/
public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
-
private static final long serialVersionUID = 1L;
-
+
public static final String QUERY = "query";
public static final String INTERVAL = "interval";
@@ -49,7 +48,7 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -69,12 +68,12 @@
}
@Override
- public void stop() {
+ public void stop() {
tweetClient.stop();
}
@Override
- public void alter(Map<String, String> properties) {
+ public void alter(Map<String, String> properties) {
alterRequested = true;
this.alteredParams = properties;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..06cddfd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -63,8 +63,8 @@
tupleFieldValues = new String[recordType.getFieldNames().length];
}
- public void initialize(Map<String, String> params) {
- this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+ public void initialize(Map<String, Object> params) {
+ this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
this.query = new Query(keywords);
query.setRpp(100);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..ccd6516 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -72,9 +72,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
@@ -94,7 +94,7 @@
}
protected void reconfigure(Map<String, String> arguments) {
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty != null) {
initializeFeedURLs(rssURLProperty);
}
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
index 2b884ef..abf0420 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
@@ -49,6 +49,7 @@
import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.Coredump;
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.management.EventrixClient;
import edu.uci.ics.asterix.event.management.EventUtil;
@@ -223,6 +224,14 @@
}
configuration.setStore(stores);
+ List<Coredump> coredump = new ArrayList<Coredump>();
+ String coredumpDir = null;
+ for (Node node : cluster.getNode()) {
+ coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+ coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir));
+ }
+ configuration.setCoredump(coredump);
+
File asterixConfDir = new File(InstallerDriver.getAsterixDir() + File.separator + asterixInstanceName);
asterixConfDir.mkdirs();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index f9f5260..c33d130 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -43,38 +43,29 @@
* received from the metadata node, to avoid contacting the metadata node
* repeatedly. We assume that this metadata manager is the only metadata manager
* in an Asterix cluster. Therefore, no separate cache-invalidation mechanism is
- * needed at this point.
- * Assumptions/Limitations:
- * The metadata subsystem is started during NC Bootstrap start, i.e., when
- * Asterix is deployed.
- * The metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix
- * is undeployed.
- * The metadata subsystem consists of the MetadataManager and the MatadataNode.
- * The MetadataManager provides users access to the metadata.
- * The MetadataNode implements direct access to the storage layer on behalf of
- * the MetadataManager, and translates the binary representation of ADM into
- * Java objects for consumption by the MetadataManager's users.
- * There is exactly one instance of the MetadataManager and of the MetadataNode
- * in the cluster, which may or may not be co-located on the same machine (or in
- * the same JVM).
- * The MetadataManager exists in the same JVM as its user's (e.g., the query
- * compiler).
- * The MetadataNode exists in the same JVM as it's transactional components
- * (LockManager, LogManager, etc.)
- * Users shall access the metadata only through the MetadataManager, and never
- * via the MetadataNode directly.
+ * needed at this point. Assumptions/Limitations: The metadata subsystem is
+ * started during NC Bootstrap start, i.e., when Asterix is deployed. The
+ * metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix is
+ * undeployed. The metadata subsystem consists of the MetadataManager and the
+ * MatadataNode. The MetadataManager provides users access to the metadata. The
+ * MetadataNode implements direct access to the storage layer on behalf of the
+ * MetadataManager, and translates the binary representation of ADM into Java
+ * objects for consumption by the MetadataManager's users. There is exactly one
+ * instance of the MetadataManager and of the MetadataNode in the cluster, which
+ * may or may not be co-located on the same machine (or in the same JVM). The
+ * MetadataManager exists in the same JVM as its user's (e.g., the query
+ * compiler). The MetadataNode exists in the same JVM as it's transactional
+ * components (LockManager, LogManager, etc.) Users shall access the metadata
+ * only through the MetadataManager, and never via the MetadataNode directly.
* Multiple threads may issue requests to the MetadataManager concurrently. For
* the sake of accessing metadata, we assume a transaction consists of one
- * thread.
- * Users are responsible for locking the metadata (using the MetadataManager
- * API) before issuing requests.
- * The MetadataNode is responsible for acquiring finer-grained locks on behalf
- * of requests from the MetadataManager. Currently, locks are acquired per
- * BTree, since the BTree does not acquire even finer-grained locks yet
- * internally.
- * The metadata can be queried with AQL DML like any other dataset, but can only
- * be changed with AQL DDL.
- * The transaction ids for metadata transactions must be unique across the
+ * thread. Users are responsible for locking the metadata (using the
+ * MetadataManager API) before issuing requests. The MetadataNode is responsible
+ * for acquiring finer-grained locks on behalf of requests from the
+ * MetadataManager. Currently, locks are acquired per BTree, since the BTree
+ * does not acquire even finer-grained locks yet internally. The metadata can be
+ * queried with AQL DML like any other dataset, but can only be changed with AQL
+ * DDL. The transaction ids for metadata transactions must be unique across the
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
@@ -220,7 +211,7 @@
@Override
public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
- // add dataset into metadataNode
+ // add dataset into metadataNode
try {
metadataNode.addDataset(ctx.getJobId(), dataset);
} catch (RemoteException e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 117f492..29bafa9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -34,6 +34,7 @@
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
@@ -101,6 +102,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -119,6 +121,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -157,6 +160,7 @@
private final AsterixStorageProperties storageProperties;
private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+ private static Scheduler hdfsScheduler;
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -178,6 +182,16 @@
this.defaultDataverse = defaultDataverse;
this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+ try {
+ if (hdfsScheduler == null) {
+ //set the singleton hdfs scheduler
+ hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+ .getClusterControllerInfo().getClientNetPort());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public void setJobId(JobId jobId) {
@@ -343,8 +357,8 @@
adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
- adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
- itemType);
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+ wrapProperties(datasetDetails.getProperties()), itemType);
} catch (AlgebricksException ae) {
throw ae;
} catch (Exception e) {
@@ -362,7 +376,7 @@
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
- adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
+ wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
AlgebricksPartitionConstraint constraint;
try {
@@ -427,14 +441,15 @@
}
if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
- adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
+ adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
+ .getProperties()));
adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
} else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
outputTypeName).getDatatype();
adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
- datasetDetails.getProperties(), adapterOutputType);
+ wrapProperties(datasetDetails.getProperties()), adapterOutputType);
} else {
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
@@ -451,7 +466,8 @@
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
- datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+ this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
+ adapterFactory);
AlgebricksPartitionConstraint constraint = null;
try {
@@ -1484,4 +1500,32 @@
return FormatUtils.getDefaultFormat();
}
+ /**
+ * Add HDFS scheduler and the cluster location constraint into the scheduler
+ *
+ * @param properties
+ * the original dataset properties
+ * @return a new map containing the original dataset properties and the scheduler/locations
+ */
+ private Map<String, Object> wrapProperties(Map<String, String> properties) {
+ Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+ wrappedProperties.putAll(properties);
+ wrappedProperties.put(HDFSAdapterFactory.SCHEDULER, hdfsScheduler);
+ wrappedProperties.put(HDFSAdapterFactory.CLUSTER_LOCATIONS, getClusterLocations());
+ return wrappedProperties;
+ }
+
+ /**
+ * Adapt the original properties to a string-object map
+ *
+ * @param properties
+ * the original properties
+ * @return the new stirng-object map
+ */
+ private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+ Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+ wrappedProperties.putAll(properties);
+ return wrappedProperties;
+ }
+
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..afdf343 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,7 +19,6 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -47,48 +46,15 @@
IManagedFeedAdapter {
private static final long serialVersionUID = 1L;
+ private FileSystemBasedAdapter coreAdapter;
+ private String format;
- public static final String KEY_FILE_SYSTEM = "fs";
- public static final String LOCAL_FS = "localfs";
- public static final String HDFS = "hdfs";
-
- private final FileSystemBasedAdapter coreAdapter;
- private final Map<String, String> configuration;
- private final String fileSystem;
- private final String format;
-
- public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
+ public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+ FileSystemBasedAdapter coreAdapter, String format) throws Exception {
super(atype);
- checkRequiredArgs(configuration);
- fileSystem = configuration.get(KEY_FILE_SYSTEM);
- String adapterFactoryClass = null;
- if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
- adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
- } else if (fileSystem.equals(HDFS)) {
- adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
- } else {
- throw new AsterixException("Unsupported file system type " + fileSystem);
- }
- format = configuration.get(KEY_FORMAT);
- IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
- adapterFactoryClass).newInstance();
- coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
this.configuration = configuration;
- }
-
- private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
- if (configuration.get(KEY_FILE_SYSTEM) == null) {
- throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
- }
- if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
- throw new Exception("Record type not specified (output-type-name=?)");
- }
- if (configuration.get(KEY_PATH) == null) {
- throw new Exception("File path not specified (path=?)");
- }
- if (configuration.get(KEY_FORMAT) == null) {
- throw new Exception("File format not specified (format=?)");
- }
+ this.coreAdapter = coreAdapter;
+ this.format = format;
}
@Override
@@ -103,7 +69,7 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
coreAdapter.configure(arguments);
}
@@ -189,16 +155,16 @@
private final ARecordType recordType;
private final IDataParser dataParser;
- private final Map<String, String> configuration;
+ private final Map<String, Object> configuration;
public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, Map<String, String> configuration) {
+ char fieldDelimiter, Map<String, Object> configuration) {
this.recordType = recordType;
dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
this.configuration = configuration;
}
- public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
+ public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
this.recordType = recordType;
dataParser = new ADMDataParser();
this.configuration = configuration;
@@ -221,10 +187,10 @@
public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
- Map<String, String> configuration) {
+ Map<String, Object> configuration) {
super(ctx, recType);
this.dataParser = dataParser;
- String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+ String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
if (propValue != null) {
interTupleInterval = Long.parseLong(propValue);
} else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..bf1c268 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,7 +16,9 @@
import java.util.Map;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
@@ -28,10 +30,37 @@
* source file has been ingested.
*/
public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final String KEY_FILE_SYSTEM = "fs";
+ public static final String LOCAL_FS = "localfs";
+ public static final String HDFS = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_FORMAT = "format";
+
+ private IGenericDatasetAdapterFactory adapterFactory;
+ private String format;
+ private boolean setup = false;
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
- return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
+ if (!setup) {
+ checkRequiredArgs(configuration);
+ String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+ String adapterFactoryClass = null;
+ if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+ adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+ } else if (fileSystem.equals(HDFS)) {
+ adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+ } else {
+ throw new AsterixException("Unsupported file system type " + fileSystem);
+ }
+ format = (String) configuration.get(KEY_FORMAT);
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+ setup = true;
+ }
+ return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
+ (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
}
@Override
@@ -39,4 +68,19 @@
return "file_feed";
}
+ private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+ if (configuration.get(KEY_FILE_SYSTEM) == null) {
+ throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+ }
+ if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+ throw new Exception("Record type not specified (output-type-name=?)");
+ }
+ if (configuration.get(KEY_PATH) == null) {
+ throw new Exception("File path not specified (path=?)");
+ }
+ if (configuration.get(KEY_FORMAT) == null) {
+ throw new Exception("File format not specified (format=?)");
+ }
+ }
+
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
index d5e525a..d583352 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
@@ -437,6 +437,20 @@
return s.toString();
}
+
+ public String coreDump() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t firstUpgrader: " + firstUpgrader);
+ sb.append("\n\t firstWaiter: " + firstWaiter);
+ sb.append("\n\t lastHolder: " + lastHolder);
+ sb.append("\n\t ISCount: " + ISCount);
+ sb.append("\n\t IXCount: " + IXCount);
+ sb.append("\n\t SCount: " + SCount);
+ sb.append("\n\t XCount: " + XCount);
+ sb.append("\n\t entityResourceHT");
+ sb.append(entityResourceHT.prettyPrint());
+ return sb.toString();
+ }
/////////////////////////////////////////////////////////
// set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
index 5d81e8a..206a3bf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -272,29 +274,35 @@
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- s.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
- .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
- for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
- s.append(j).append(": ");
- s.append("\t" + child.getJobId(j));
- s.append("\t" + child.getDatasetId(j));
- s.append("\t" + child.getPKHashVal(j));
- s.append("\t" + child.getDatasetLockMode(j));
- s.append("\t" + child.getDatasetLockCount(j));
- s.append("\t" + child.getEntityLockMode(j));
- s.append("\t" + child.getEntityLockCount(j));
- s.append("\t" + child.getNextEntityActor(j));
- s.append("\t" + child.getPrevJobResource(j));
- s.append("\t" + child.getNextJobResource(j));
- //s.append("\t" + child.getNextDatasetActor(j));
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+ int size = pArray.size();
+ ChildEntityInfoArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
public void initEntityInfo(int slotNum, int jobId, int datasetId, int PKHashVal, byte lockMode) {
pArray.get(slotNum / ChildEntityInfoArrayManager.NUM_OF_SLOTS).initEntityInfo(
@@ -567,6 +575,29 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+ sb.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
+ .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
+ for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
+ sb.append(j).append(": ");
+ sb.append("\t" + getJobId(j));
+ sb.append("\t" + getDatasetId(j));
+ sb.append("\t" + getPKHashVal(j));
+ sb.append("\t" + getDatasetLockMode(j));
+ sb.append("\t" + getDatasetLockCount(j));
+ sb.append("\t" + getEntityLockMode(j));
+ sb.append("\t" + getEntityLockCount(j));
+ sb.append("\t" + getNextEntityActor(j));
+ sb.append("\t" + getPrevJobResource(j));
+ sb.append("\t" + getNextJobResource(j));
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
//////////////////////////////////////////////////////////////////
// set/get method for each field of EntityInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
index ca00aa2..2fae460 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -271,22 +273,35 @@
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- s.append("\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
- for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
- s.append(j).append(": ");
- s.append("\t" + child.getXCount(j));
- s.append("\t" + child.getSCount(j));
- s.append("\t" + child.getLastHolder(j));
- s.append("\t" + child.getFirstWaiter(j));
- s.append("\t" + child.getUpgrader(j));
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+ int size = pArray.size();
+ ChildEntityLockInfoArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
//debugging method
public String printWaiters(int slotNum) {
@@ -736,6 +751,23 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum());
+ sb.append("\n\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
+ for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
+ sb.append(j).append(": ");
+ sb.append("\t" + getXCount(j));
+ sb.append("\t" + getSCount(j));
+ sb.append("\t" + getLastHolder(j));
+ sb.append("\t" + getFirstWaiter(j));
+ sb.append("\t" + getUpgrader(j));
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
//////////////////////////////////////////////////////////////////
// set/get method for each field of EntityLockInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index d846603..8bf5304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -275,6 +275,17 @@
}
return s.toString();
}
+
+ public String coreDump() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t datasetISLockHT");
+ sb.append(datasetISLockHT.prettyPrint());
+ sb.append("\n\t firstWaitingResource: " + firstWaitingResource);
+ sb.append("\n\t lastHoldingResource: " + lastHoldingResource);
+ sb.append("\n\t upgradingResource: " + upgradingResource);
+ sb.append("\n\t jobCtx.jobId: " + jobCtx.getJobId());
+ return sb.toString();
+ }
/////////////////////////////////////////////////////////
// set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 8fb7494..cf41199 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -15,10 +15,13 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,6 +36,7 @@
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
* An implementation of the ILockManager interface for the
@@ -42,7 +46,7 @@
* @author pouria, kisskys
*/
-public class LockManager implements ILockManager {
+public class LockManager implements ILockManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
//This variable indicates that the dataset granule X lock request is allowed when
@@ -2046,6 +2050,182 @@
unlatchLockTable();
}
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+
+ //#. dump Configurable Variables
+ dumpConfVars(os);
+
+ //#. dump jobHT
+ dumpJobInfo(os);
+
+ //#. dump datasetResourceHT
+ dumpDatasetLockInfo(os);
+
+ //#. dump entityLockInfoManager
+ dumpEntityLockInfo(os);
+
+ //#. dump entityInfoManager
+ dumpEntityInfo(os);
+
+ //#. dump lockWaiterManager
+
+ dumpLockWaiterInfo(os);
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpConfVars(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+ + entityLockInfoManager.getShrinkTimerThreshold());
+ sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
+ sb.append("\nSHRINK_TIMER_THRESHOLD (lockWaiterManager): " + lockWaiterManager.getShrinkTimerThreshold());
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpJobInfo(OutputStream os) {
+ JobId jobId;
+ JobInfo jobInfo;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [JobInfo] -----");
+ Set<Map.Entry<JobId, JobInfo>> entrySet = jobHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<JobId, JobInfo> entry : entrySet) {
+ if (entry != null) {
+ jobId = entry.getKey();
+ if (jobId != null) {
+ sb.append("\n" + jobId);
+ } else {
+ sb.append("\nJID:null");
+ }
+
+ jobInfo = entry.getValue();
+ if (jobInfo != null) {
+ sb.append(jobInfo.coreDump());
+ } else {
+ sb.append("\nJobInfo:null");
+ }
+ }
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [JobInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpDatasetLockInfo(OutputStream os) {
+ DatasetId datasetId;
+ DatasetLockInfo datasetLockInfo;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [DatasetLockInfo] -----");
+ Set<Map.Entry<DatasetId, DatasetLockInfo>> entrySet = datasetResourceHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<DatasetId, DatasetLockInfo> entry : entrySet) {
+ if (entry != null) {
+ datasetId = entry.getKey();
+ if (datasetId != null) {
+ sb.append("\nDatasetId:" + datasetId.getId());
+ } else {
+ sb.append("\nDatasetId:null");
+ }
+
+ datasetLockInfo = entry.getValue();
+ if (datasetLockInfo != null) {
+ sb.append(datasetLockInfo.coreDump());
+ } else {
+ sb.append("\nDatasetLockInfo:null");
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [DatasetLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+
+ //create a new sb to avoid possible OOM exception
+ sb = new StringBuilder();
+ }
+ }
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpEntityLockInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityLockInfo] -----");
+ entityLockInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpEntityInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityInfo] -----");
+ entityInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpLockWaiterInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [LockWaiterInfo] -----");
+ lockWaiterManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [LockWaiterInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}
class ConsecutiveWakeupContext {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
index bd414de..c6fcbd5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
/**
@@ -258,28 +260,42 @@
StringBuilder s = new StringBuilder("\n########### LockWaiterManager Status #############\n");
int size = pArray.size();
ChildLockWaiterArrayManager child;
- LockWaiter waiter;
for (int i = 0; i < size; i++) {
child = pArray.get(i);
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
- waiter = child.getLockWaiter(j);
- s.append(j).append(": ");
- s.append("\t" + waiter.getEntityInfoSlot());
- s.append("\t" + waiter.needWait());
- s.append("\t" + waiter.isVictim());
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n########### LockWaiterManager Status #############\n");
+ int size = pArray.size();
+ ChildLockWaiterArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
+
public LockWaiter getLockWaiter(int slotNum) {
return pArray.get(slotNum / ChildLockWaiterArrayManager.NUM_OF_SLOTS).getLockWaiter(
slotNum % ChildLockWaiterArrayManager.NUM_OF_SLOTS);
@@ -364,4 +380,20 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ LockWaiter waiter;
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+ for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
+ waiter = getLockWaiter(j);
+ sb.append(j).append(": ");
+ sb.append("\t" + waiter.getEntityInfoSlot());
+ sb.append("\t" + waiter.needWait());
+ sb.append("\t" + waiter.isVictim());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
index a53c890..05052f0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
@@ -2,6 +2,7 @@
import java.util.LinkedList;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
/**
@@ -26,7 +27,7 @@
this.lockMgr = lockMgr;
this.trigger = new Thread(new TimeoutTrigger(this));
trigger.setDaemon(true);
- trigger.start();
+ AsterixThreadExecutor.INSTANCE.execute(trigger);
}
public void sweep() throws ACIDException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 318996a..205c5f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -31,6 +32,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.FileBasedBuffer;
import edu.uci.ics.asterix.common.transactions.FileUtil;
@@ -49,8 +51,9 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
-public class LogManager implements ILogManager {
+public class LogManager implements ILogManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
@@ -185,7 +188,7 @@
*/
logPageFlusher = new LogPageFlushThread(this);
logPageFlusher.setDaemon(true);
- logPageFlusher.start();
+ AsterixThreadExecutor.INSTANCE.execute(logPageFlusher);
}
public int getLogPageIndex(long lsnValue) {
@@ -758,6 +761,60 @@
map.clear();
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ //#. dump Configurable Variables
+ dumpConfVars(os);
+
+ //#. dump LSNInfo
+ dumpLSNInfo(os);
+
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpConfVars(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ sb.append(logManagerProperties.toString());
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpLSNInfo(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [LSNInfo] -----");
+ sb.append("\nstartingLSN: " + startingLSN);
+ sb.append("\ncurrentLSN: " + lsn.get());
+ sb.append("\nlastFlushedLSN: " + lastFlushedLSN.get());
+ sb.append("\n>>dump_end\t>>----- [LSNInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}
/*
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 50d4625..1e85c35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -57,6 +58,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -83,7 +85,7 @@
* not in place completely. Once we have physical logging implemented, we would
* add support for crash recovery.
*/
-public class RecoveryManager implements IRecoveryManager {
+public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
@@ -430,7 +432,7 @@
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
}
-
+
LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
String logDir = logMgr.getLogManagerProperties().getLogDir();
@@ -522,7 +524,7 @@
if (isSharpCheckpoint) {
logMgr.renewLogFiles();
}
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Completed sharp checkpoint.");
}
@@ -782,6 +784,16 @@
+ undoCount);
}
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ //no op
+ }
}
class TxnId {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 2659d8f..d4360ef 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -172,5 +172,15 @@
return (o == this);
}
-
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n" + jobId + "\n");
+ sb.append("transactionType: " + transactionType);
+ sb.append("firstLogLocator: " + firstLogLocator.getLsn() + "\n");
+ sb.append("lastLogLocator: " + lastLogLocator.getLsn() + "\n");
+ sb.append("TransactionState: " + txnState + "\n");
+ sb.append("startWaitTime: " + startWaitTime + "\n");
+ sb.append("status: " + status + "\n");
+ return sb.toString();
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 4e8808f..04f12ac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.asterix.transaction.management.service.transaction;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -26,15 +29,18 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
* An implementation of the @see ITransactionManager interface that provides
* implementation of APIs for governing the lifecycle of a transaction.
*/
-public class TransactionManager implements ITransactionManager {
+public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
+
+ public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
private final TransactionSubsystem transactionProvider;
- private Map<JobId, ITransactionContext> ITransactionContextRepository = new HashMap<JobId, ITransactionContext>();
+ private Map<JobId, ITransactionContext> transactionContextRepository = new HashMap<JobId, ITransactionContext>();
private AtomicInteger maxJobId = new AtomicInteger(0);
public TransactionManager(TransactionSubsystem provider) {
@@ -61,7 +67,7 @@
} finally {
txnContext.releaseResources();
transactionProvider.getLockManager().releaseLocks(txnContext);
- ITransactionContextRepository.remove(txnContext.getJobId());
+ transactionContextRepository.remove(txnContext.getJobId());
txnContext.setTxnState(TransactionState.ABORTED);
}
}
@@ -72,7 +78,7 @@
setMaxJobId(jobId.getId());
ITransactionContext txnContext = new TransactionContext(jobId, transactionProvider);
synchronized (this) {
- ITransactionContextRepository.put(jobId, txnContext);
+ transactionContextRepository.put(jobId, txnContext);
}
return txnContext;
}
@@ -80,13 +86,13 @@
@Override
public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
setMaxJobId(jobId.getId());
- synchronized (ITransactionContextRepository) {
+ synchronized (transactionContextRepository) {
- ITransactionContext context = ITransactionContextRepository.get(jobId);
+ ITransactionContext context = transactionContextRepository.get(jobId);
if (context == null) {
- context = ITransactionContextRepository.get(jobId);
+ context = transactionContextRepository.get(jobId);
context = new TransactionContext(jobId, transactionProvider);
- ITransactionContextRepository.put(jobId, context);
+ transactionContextRepository.put(jobId, context);
}
return context;
}
@@ -107,13 +113,13 @@
if (PKHashVal != -1) {
transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
/*****************************
- try {
- //decrease the transaction reference count on index
- txnContext.decreaseActiveTransactionCountOnIndexes();
- } catch (HyracksDataException e) {
- throw new ACIDException("failed to complete index operation", e);
- }
- *****************************/
+ * try {
+ * //decrease the transaction reference count on index
+ * txnContext.decreaseActiveTransactionCountOnIndexes();
+ * } catch (HyracksDataException e) {
+ * throw new ACIDException("failed to complete index operation", e);
+ * }
+ *****************************/
return;
}
@@ -131,7 +137,7 @@
} finally {
txnContext.releaseResources();
transactionProvider.getLockManager().releaseLocks(txnContext); // release
- ITransactionContextRepository.remove(txnContext.getJobId());
+ transactionContextRepository.remove(txnContext.getJobId());
txnContext.setTxnState(TransactionState.COMMITTED);
}
}
@@ -151,12 +157,69 @@
public TransactionSubsystem getTransactionProvider() {
return transactionProvider;
}
-
+
public void setMaxJobId(int jobId) {
maxJobId.set(Math.max(maxJobId.get(), jobId));
}
-
+
public int getMaxJobId() {
return maxJobId.get();
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ //#. dump TxnContext
+ dumpTxnContext(os);
+
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpTxnContext(OutputStream os) {
+ JobId jobId;
+ ITransactionContext txnCtx;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ Set<Map.Entry<JobId, ITransactionContext>> entrySet = transactionContextRepository.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<JobId, ITransactionContext> entry : entrySet) {
+ if (entry != null) {
+ jobId = entry.getKey();
+ if (jobId != null) {
+ sb.append("\n" + jobId);
+ } else {
+ sb.append("\nJID:null");
+ }
+
+ txnCtx = entry.getValue();
+ if (txnCtx != null) {
+ sb.append(txnCtx.prettyPrint());
+ } else {
+ sb.append("\nTxnCtx:null");
+ }
+ }
+ }
+ }
+
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}