Skip to content

Commit

Permalink
Fixed several bugs in BetterSeekableBufferStream.fillBuffer() which l…
Browse files Browse the repository at this point in the history
…ead to buffer inconsistent updates (including java.util.zip.DataFormatException: invalid distance too far back #39)
  • Loading branch information
iromeo committed Apr 27, 2018
1 parent 73ce9be commit 4a4f199
Show file tree
Hide file tree
Showing 3 changed files with 467 additions and 194 deletions.
142 changes: 102 additions & 40 deletions src/main/kotlin/org/jetbrains/bio/BetterSeekableBufferedStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import kotlin.math.min
*/
open class BetterSeekableBufferedStream(
private val stream: SeekableStream,
bufferSize: Int = DEFAULT_BUFFER_SIZE
bufferSize: Int = DEFAULT_BUFFER_SIZE,
val doubleBuffer: Boolean = true
) : SeekableStream() {

var position: Long = 0
Expand Down Expand Up @@ -44,6 +45,7 @@ open class BetterSeekableBufferedStream(
buffers[curBufIdx()] = value
}

// Actual buffer size is up to this value
var bufferSize: Int
get() = buffer!!.size
set(bufferSize) {
Expand Down Expand Up @@ -80,24 +82,26 @@ open class BetterSeekableBufferedStream(
override fun read(buffer: ByteArray, offset: Int, length: Int): Int {
val initialPos = position
val requestEndOffset = initialPos + length
val cachedStart = bufferStartOffset
val cachedEnd = bufferEndOffset

var readBytes = 0
var dstOffset = offset // last written offset in requested buffer
if (requestEndOffset >= bufferEndOffset) {
if (position in bufferStartOffset until bufferEndOffset) {
if (requestEndOffset >= cachedEnd) {
if (position in cachedStart until cachedEnd) {
// requested buffer prefix intersection: copy prefix and proceed
val inCBuffPos = (position - bufferStartOffset).toInt()
val count = (bufferEndOffset - bufferStartOffset).toInt() - inCBuffPos
val inCBuffPos = (position - cachedStart).toInt()
val count = (cachedEnd - cachedStart).toInt() - inCBuffPos
System.arraycopy(this.buffer!!, inCBuffPos, buffer, dstOffset, count)
readBytes = count
dstOffset += count
// move position
seek(position + count)
}
} else if (requestEndOffset >= bufferStartOffset) {
} else if (requestEndOffset >= cachedStart) {
// requested buffer suffix intersection: copy prefix and proceed
val inCBuffPos = max(0, (position - bufferStartOffset).toInt())
val count = (requestEndOffset - bufferStartOffset).toInt() - inCBuffPos
val inCBuffPos = max(0, (position - cachedStart).toInt())
val count = (requestEndOffset - cachedStart).toInt() - inCBuffPos
System.arraycopy(this.buffer!!, inCBuffPos, buffer, dstOffset + length - count, count)
readBytes = count
// do not move position
Expand All @@ -106,11 +110,12 @@ open class BetterSeekableBufferedStream(
while (readBytes < length) {
fillBuffer()

if (bufferEndOffset == -1L) {
val newCachedEnd = bufferEndOffset
if (newCachedEnd == -1L) {
// eof
break
}
val available = (bufferEndOffset - position).toInt()
val available = (newCachedEnd - position).toInt()
val remaining = length - readBytes
val count = min(remaining, available)
val inBuffPos = Ints.checkedCast(position - bufferStartOffset)
Expand All @@ -125,11 +130,14 @@ open class BetterSeekableBufferedStream(
}

override fun read(): Int {
if (bufferEndOffset == -1L) {
val cachedStart = bufferStartOffset
val cachedEnd = bufferEndOffset

if (cachedEnd == -1L) {
return -1 // eof
}

if (position !in bufferStartOffset until bufferEndOffset) {
if (position !in cachedStart until cachedEnd) {
fillBuffer()
if (bufferEndOffset == -1L) {
return -1 // eof
Expand All @@ -142,56 +150,110 @@ open class BetterSeekableBufferedStream(
}

internal fun switchBuffersIfNeeded() {
val buffSize = buffer!!.size
val curBuffRange = bufferStartOffset until bufferEndOffset
// if current buffer intersects next one - do nothing
// action required if next buffer is not intersecting current
// otherwise impl will be too complicated
if (position in curBuffRange || (position + buffSize) in curBuffRange) {
// do nothing
if (!doubleBuffer) {
return
}

// switch buffers:
useSndBuffer = !useSndBuffer
// cache: --------
// opt1 : *****
// opt2 : -----
// opt3 : -----
// opt4 : ------
// opt5 : *****
// opt6 : ----------------
val cachedStart = bufferStartOffset
val cachedEnd = bufferEndOffset
val maxBuffSize = buffer!!.size
val newEnd = position + maxBuffSize

if ((position < cachedStart && newEnd <= cachedStart) // new buffer before cache
|| (position >= cachedEnd)) { // new buffer after cache

// switch buffers:
useSndBuffer = !useSndBuffer
}
}

protected open fun fillBuffer() {
protected open fun fillBuffer(): Int {
switchBuffersIfNeeded()

val buff = buffer!!
val buffSize = buff.size
val maxBuffSize = buff.size

val cachedStart = bufferStartOffset
val cachedEnd = bufferEndOffset

var actuallyReadBytes = 0
val desiredNextEndOffset = position + maxBuffSize
when {
position >= bufferEndOffset -> {
position >= cachedEnd -> {
// cache: -------
// opt1 : ??????????

// doesn't intersect existing buffer
fetchNewBuffer(position, 0, buffSize)
actuallyReadBytes = fetchNewBuffer(position, 0, maxBuffSize)
}
position >= bufferStartOffset -> {

position >= cachedStart -> {
// cache: -------
// opt1 : -----???????
// opt2 : ----

// double buffer was switched to the other one containing data
// do nothing
}
position + buffSize > bufferStartOffset -> {
// copy intersecting part
val count = (position + buffSize - bufferStartOffset).toInt()
System.arraycopy(buff, 0, buff, buffSize - count, count)

// fetch remaining part
fetchNewBuffer(position, 0, buffSize - count)
check(bufferEndOffset != -1L) {
"Read backward, cannot be eof. Position $position, $count"

desiredNextEndOffset > cachedStart -> {
// cache: -------
// opt1 : ???????-----
// opt2 : ??-------???

// copy intersecting part: Let's copy only left part and don't try to read small left
// part. Better to read left part next time as a large buffer.
val count = (minOf(desiredNextEndOffset, cachedEnd) - cachedStart).toInt()

val remainingPrefixLength = (cachedStart - position).toInt()
System.arraycopy(buff, 0, buff, remainingPrefixLength, count)

// fetch remaining part at left side
try {
actuallyReadBytes = fetchNewBuffer(position, 0, remainingPrefixLength)
check(bufferEndOffset != -1L) {
"Read backward, cannot be eof. Position $position, $count"
}

if (actuallyReadBytes == remainingPrefixLength) {
// forward already read part
bufferEndOffset += count
}
// else we have a gap => trim buffer and ignore shared count
} catch (e: Exception) {
// we cannot left buffer in corrupted state, let's drop it.
// Also we have to break invariant that position is in buffer, seems
// it isn't an issue because we rethrow error
//
// alternative version is to copy common part in temporary buffer
// then fetch new buffer and finally copy temp buffer to ours buffer
// Seems this case is a rare on in general, so let's just drop buffer

// that position is in buffer after filling buffer, so we have not o
bufferStartOffset = 0
bufferEndOffset = 0
throw e
}
// forward already read part
bufferEndOffset += count
}
else -> {
// cache: -------
// opt1 : ??????????

// doesn't intersect existing buffer
fetchNewBuffer(position, 0, buffSize)
actuallyReadBytes = fetchNewBuffer(position, 0, maxBuffSize)
}
}
return actuallyReadBytes

}

protected open fun fetchNewBuffer(pos: Long, buffOffset: Int, size: Int): Int {
internal open fun fetchNewBuffer(pos: Long, buffOffset: Int, size: Int): Int {
stream.seek(pos)

if (size == 0) {
Expand Down
42 changes: 15 additions & 27 deletions src/main/kotlin/org/jetbrains/bio/RomBuffer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import org.jetbrains.bio.big.RTreeIndex
import java.io.Closeable
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.zip.DataFormatException
import java.util.zip.Inflater

/** A read-only buffer. */
Expand Down Expand Up @@ -123,35 +122,24 @@ abstract class RomBuffer : Closeable {
inflater.reset()
inflater.setInput(compressedBuf)
val sizeInt = size.toInt()
val step = sizeInt
var remaining = sizeInt
val maxUncompressedChunk = if (uncompressBufSize == 0) sizeInt else uncompressBufSize
try {
while (remaining > 0) {
uncompressedBuf = Bytes.ensureCapacity(
uncompressedBuf,
uncompressedSize + maxUncompressedChunk,
maxUncompressedChunk / 2 // 1.5x
)

// start next chunk if smth remains
if (inflater.finished()) {
inflater.reset()
inflater.setInput(compressedBuf, sizeInt - remaining, remaining)
}

val actual = inflater.inflate(uncompressedBuf, uncompressedSize, maxUncompressedChunk)
remaining = inflater.remaining
uncompressedSize += actual
while (remaining > 0) {
uncompressedBuf = Bytes.ensureCapacity(
uncompressedBuf,
uncompressedSize + maxUncompressedChunk,
maxUncompressedChunk / 2 // 1.5x
)

// start next chunk if smth remains
if (inflater.finished()) {
inflater.reset()
inflater.setInput(compressedBuf, sizeInt - remaining, remaining)
}
} catch (e: DataFormatException) {
val msg = """[@${Thread.currentThread().id}] java.util.zip.DataFormatException: ${e.message}
| offset=$offset, limit=$limit, size=$size, compressed size=${compressedBuf.size}
| remaining=$remaining, uncompressedSize=$uncompressedSize
| uncompressBufSize=$uncompressBufSize
|""".trimMargin()
org.apache.log4j.Logger.getRootLogger().error(msg)
throw e

val actual = inflater.inflate(uncompressedBuf, uncompressedSize, maxUncompressedChunk)
remaining = inflater.remaining
uncompressedSize += actual
}
// Not obligatory, but let's left thread local variable in clean state
inflater.reset()
Expand Down
Loading

0 comments on commit 4a4f199

Please sign in to comment.