merged r1274:1322 from hyracks_dev_next

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1342 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-control/hyracks-control-nc/pom.xml
index 49f2371..202b601 100644
--- a/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-control/hyracks-control-nc/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-control</artifactId>
-    <version>0.2.0-SNAPSHOT</version>
+    <version>0.2.1-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -30,14 +30,14 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-common</artifactId>
-  		<version>0.2.0-SNAPSHOT</version>
+  		<version>0.2.1-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-net</artifactId>
-  		<version>0.2.0-SNAPSHOT</version>
+  		<version>0.2.1-SNAPSHOT</version>
   	</dependency>
   </dependencies>
   <reporting>
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java
new file mode 100644
index 0000000..8916fb1
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public class FileHandle implements IFileHandle {
+    private final FileReference fileRef;
+
+    private RandomAccessFile raf;
+
+    private FileChannel channel;
+
+    public FileHandle(FileReference fileRef) {
+        this.fileRef = fileRef;
+    }
+
+    public void open(IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) throws IOException {
+        String mode;
+        switch (rwMode) {
+            case READ_ONLY:
+                mode = "r";
+                break;
+
+            case READ_WRITE:
+                fileRef.getFile().getAbsoluteFile().getParentFile().mkdirs();
+                switch (syncMode) {
+                    case METADATA_ASYNC_DATA_ASYNC:
+                        mode = "rw";
+                        break;
+
+                    case METADATA_ASYNC_DATA_SYNC:
+                        mode = "rwd";
+                        break;
+
+                    case METADATA_SYNC_DATA_SYNC:
+                        mode = "rws";
+                        break;
+
+                    default:
+                        throw new IllegalArgumentException();
+                }
+                break;
+
+            default:
+                throw new IllegalArgumentException();
+        }
+        raf = new RandomAccessFile(fileRef.getFile(), mode);
+        channel = raf.getChannel();
+    }
+
+    public void close() throws IOException {
+        channel.close();
+        raf.close();
+    }
+
+    public FileReference getFileReference() {
+        return fileRef;
+    }
+
+    public RandomAccessFile getRandomAccessFile() {
+        return raf;
+    }
+
+    public FileChannel getFileChannel() {
+        return channel;
+    }
+
+    public void sync(boolean metadata) throws IOException {
+        channel.force(metadata);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
index c3105ab..3b13f32 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -24,8 +24,8 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileHandle;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOFuture;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
@@ -61,7 +61,7 @@
     }
 
     @Override
-    public FileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+    public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
             throws HyracksDataException {
         FileHandle fHandle = new FileHandle(fileRef);
         try {
@@ -73,14 +73,15 @@
     }
 
     @Override
-    public int syncWrite(FileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+    public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
         try {
             int n = 0;
             int remaining = data.remaining();
             while (remaining > 0) {
-                int len = fHandle.getFileChannel().write(data, offset);
+                int len = ((FileHandle) fHandle).getFileChannel().write(data, offset);
                 if (len < 0) {
-                    throw new HyracksDataException("Error writing to file: " + fHandle.getFileReference().toString());
+                    throw new HyracksDataException("Error writing to file: "
+                            + ((FileHandle) fHandle).getFileReference().toString());
                 }
                 remaining -= len;
                 offset += len;
@@ -95,12 +96,12 @@
     }
 
     @Override
-    public int syncRead(FileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+    public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
         try {
             int n = 0;
             int remaining = data.remaining();
             while (remaining > 0) {
-                int len = fHandle.getFileChannel().read(data, offset);
+                int len = ((FileHandle) fHandle).getFileChannel().read(data, offset);
                 if (len < 0) {
                     return -1;
                 }
@@ -117,23 +118,23 @@
     }
 
     @Override
-    public IIOFuture asyncWrite(FileHandle fHandle, long offset, ByteBuffer data) {
-        AsyncWriteRequest req = new AsyncWriteRequest(fHandle, offset, data);
+    public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) {
+        AsyncWriteRequest req = new AsyncWriteRequest((FileHandle) fHandle, offset, data);
         executor.execute(req);
         return req;
     }
 
     @Override
-    public IIOFuture asyncRead(FileHandle fHandle, long offset, ByteBuffer data) {
-        AsyncReadRequest req = new AsyncReadRequest(fHandle, offset, data);
+    public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) {
+        AsyncReadRequest req = new AsyncReadRequest((FileHandle) fHandle, offset, data);
         executor.execute(req);
         return req;
     }
 
     @Override
-    public void close(FileHandle fHandle) throws HyracksDataException {
+    public void close(IFileHandle fHandle) throws HyracksDataException {
         try {
-            fHandle.close();
+            ((FileHandle) fHandle).close();
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
@@ -225,4 +226,13 @@
             return syncWrite(fHandle, offset, data);
         }
     }
+
+    @Override
+    public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
+        try {
+            ((FileHandle) fileHandle).sync(metadata);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
index 97f82d9..e75fe1c 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -20,8 +20,8 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileHandle;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -54,7 +54,7 @@
             @Override
             public void run() {
                 try {
-                    FileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
+                    IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
                             IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
                         writer.open();
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 1a1b15f..1f19cbe 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -23,8 +23,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileHandle;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
@@ -45,7 +45,7 @@
 
     private FileReference fRef;
 
-    private FileHandle handle;
+    private IFileHandle handle;
 
     private long size;
 
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 143fd2c..08f6ea4 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -23,8 +23,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileHandle;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -48,7 +48,7 @@
 
     private FileReference fRef;
 
-    private FileHandle handle;
+    private IFileHandle handle;
 
     private long size;
 
@@ -77,7 +77,7 @@
             @Override
             public void run() {
                 try {
-                    FileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                    IFileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
                             IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
                         writer.open();
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 92dc0b2..09dca9e 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -89,7 +89,6 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        ensureConnected();
         failed = true;
         if (delegate != null) {
             delegate.fail();
@@ -98,10 +97,10 @@
 
     @Override
     public void close() throws HyracksDataException {
-        ensureConnected();
         if (!failed) {
+            ensureConnected();
             manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+            delegate.close();
         }
-        delegate.close();
     }
 }
\ No newline at end of file