Skip to content

Commit

Permalink
AVRO-2511: add case for sync (apache#2392)
Browse files Browse the repository at this point in the history
* AVRO-2511: add case for sync
  • Loading branch information
clesaec authored and Ranbir Kumar committed May 13, 2024
1 parent af029d3 commit f84d501
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.Flushable;
import java.io.IOException;
Expand Down Expand Up @@ -51,7 +52,7 @@
* <i>blocks</i>. A synchronization marker is written between blocks, so that
* files may be split. Blocks may be compressed. Extensible metadata is stored
* at the end of the file. Files may be appended to.
*
*
* @see DataFileReader
*/
public class DataFileWriter<D> implements Closeable, Flushable {
Expand Down Expand Up @@ -181,7 +182,7 @@ public DataFileWriter<D> create(Schema schema, OutputStream outs, byte[] sync) t
* sync marker is written. By default, the writer will flush the buffer each
* time a sync marker is written (if the block size limit is reached or the
* {@linkplain #sync()} is called.
*
*
* @param flushOnEveryBlock - If set to false, this writer will not flush the
* block to the stream until {@linkplain #flush()} is
* explicitly called.
Expand Down Expand Up @@ -211,7 +212,7 @@ public DataFileWriter<D> appendTo(File file) throws IOException {
/**
* Open a writer appending to an existing file. <strong>Since 1.9.0 this method
* does not close in.</strong>
*
*
* @param in reading the existing file.
* @param out positioned at the end of the existing file.
*/
Expand Down Expand Up @@ -304,7 +305,7 @@ public AppendWriteException(Exception e) {

/**
* Append a datum to the file.
*
*
* @see AppendWriteException
*/
public void append(D datum) throws IOException {
Expand Down Expand Up @@ -365,7 +366,7 @@ private void writeIfBlockFull() throws IOException {
* at compression level 7. If <i>recompress</i> is false, blocks will be copied
* without changing the compression level. If true, they will be converted to
* the new compression level.
*
*
* @param otherFile
* @param recompress
* @throws IOException
Expand Down Expand Up @@ -439,17 +440,19 @@ public void flush() throws IOException {
}

/**
* If this writer was instantiated using a File or using an
* {@linkplain Syncable} instance, this method flushes all buffers for this
* writer to disk. In other cases, this method behaves exactly like
* {@linkplain #flush()}.
* If this writer was instantiated using a {@linkplain File},
* {@linkplain FileOutputStream} or {@linkplain Syncable} instance, this method
* flushes all buffers for this writer to disk. In other cases, this method
* behaves exactly like {@linkplain #flush()}.
*
* @throws IOException
*/
public void fSync() throws IOException {
flush();
if (underlyingStream instanceof Syncable) {
((Syncable) underlyingStream).sync();
} else if (underlyingStream instanceof FileOutputStream) {
((FileOutputStream) underlyingStream).getFD().sync();
}
}

Expand Down

0 comments on commit f84d501

Please sign in to comment.