Improvment on Cursor for Delimited Data
This change allows the parser to parse records in addition to streams.
Change-Id: I84ff40db664633c633277e9cc0ffa534cda9f26a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/567
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
index 7f287cc..a93de4c 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
@@ -43,7 +43,7 @@
* - Stream to write data to.
*/
public void serialize(T instance, DataOutput out) throws HyracksDataException;
-
+
/*
* TODO: Add a new method:
* T deserialize(DataInput in, T mutable)
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index e7f53bc..3d96224 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -25,45 +25,48 @@
public class FieldCursorForDelimitedDataParser {
private enum State {
- INIT,
- IN_RECORD,
- EOR,
- CR,
- EOF
+ INIT, //initial state
+ IN_RECORD, //cursor is inside record
+ EOR, //cursor is at end of record
+ CR, //cursor at carriage return
+ EOF //end of stream reached
}
- // public variables will be used by delimited data parser
- public char[] buffer;
- public int fStart;
- public int fEnd;
- public int recordCount;
- public int fieldCount;
- public int doubleQuoteCount;
- public boolean isDoubleQuoteIncludedInThisField;
+ public char[] buffer; //buffer to holds the input coming form the underlying input stream
+ public int fStart; //start position for field
+ public int fEnd; //end position for field
+ public int recordCount; //count of records
+ public int fieldCount; //count of fields in current record
+ public int doubleQuoteCount; //count of double quotes
+ public boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
- private static final int INITIAL_BUFFER_SIZE = 4096;
- private static final int INCREMENT = 4096;
+ private static final int INITIAL_BUFFER_SIZE = 4096;//initial buffer size
+ private static final int INCREMENT = 4096; //increment size
- private final Reader in;
+ private Reader in; //the underlying buffer
- private int start;
- private int end;
- private State state;
+ private int start; //start of valid buffer area
+ private int end; //end of valid buffer area
+ private State state; //state (see states above)
- private int lastQuotePosition;
- private int lastDoubleQuotePosition;
- private int lastDelimiterPosition;
- private int quoteCount;
- private boolean startedQuote;
+ private int lastQuotePosition; //position of last quote
+ private int lastDoubleQuotePosition; //position of last double quote
+ private int lastDelimiterPosition; //position of last delimiter
+ private int quoteCount; //count of single quotes
+ private boolean startedQuote; //whether a quote has been started
- private char quote;
- private char fieldDelimiter;
+ private char quote; //the quote character
+ private char fieldDelimiter; //the delimiter
public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter, char quote) {
this.in = in;
- buffer = new char[INITIAL_BUFFER_SIZE];
+ if (in != null) {
+ buffer = new char[INITIAL_BUFFER_SIZE];
+ end = 0;
+ } else {
+ end = Integer.MAX_VALUE;
+ }
start = 0;
- end = 0;
state = State.INIT;
this.quote = quote;
this.fieldDelimiter = fieldDelimiter;
@@ -78,6 +81,15 @@
fieldCount = 0;
}
+ public void nextRecord(char[] buffer, int recordLength) throws IOException {
+ recordCount++;
+ fieldCount = 0;
+ start = 0;
+ end = recordLength;
+ state = State.IN_RECORD;
+ this.buffer = buffer;
+ }
+
public boolean nextRecord() throws IOException {
recordCount++;
fieldCount = 0;
@@ -224,12 +236,8 @@
startedQuote = true;
} else {
// In this case, we don't have a quote in the beginning of a field.
- throw new IOException(
- "At record: "
- + recordCount
- + ", field#: "
- + fieldCount
- + " - a quote enclosing a field needs to be placed in the beginning of that field.");
+ throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
+ + " - a quote enclosing a field needs to be placed in the beginning of that field.");
}
}
// Check double quotes - "". We check [start != p-2]
@@ -362,4 +370,4 @@
}
}
}
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index fa6ed72..c28c740 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -34,7 +34,6 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -48,7 +47,6 @@
* The scheduler conduct data-local scheduling for data reading on HDFS. This
* class works for Hadoop old API.
*/
-@SuppressWarnings("deprecation")
public class Scheduler {
private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
@@ -75,6 +73,7 @@
* @param ncNameToNcInfos
* @throws HyracksException
*/
+
public Scheduler(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
@@ -127,7 +126,8 @@
* the hyracks cluster toplogy
* @throws HyracksException
*/
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
+ throws HyracksException {
this(ncNameToNcInfos);
this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
: new RackAwareNcCollectionBuilder(topology);
@@ -274,8 +274,8 @@
* @throws UnknownHostException
*/
private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
- boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException,
- UnknownHostException {
+ boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits)
+ throws IOException, UnknownHostException {
/** scheduling candidates will be ordered inversely according to their popularity */
PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
diff --git a/pom.xml b/pom.xml
index fbe936d..90f80a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,9 @@
<exclude>**/target/**</exclude>
<exclude>**/output/**</exclude>
<exclude>**/*.iml</exclude>
+ <exclude>**/*.prefs</exclude>
+ <exclude>**/.classpath</exclude>
+ <exclude>**/.project</exclude>
</excludes>
</configuration>
</plugin>