Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/recording log version #1647

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 117 additions & 4 deletions aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.SemanticVersion;
import org.agrona.Strings;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.Long2LongHashMap;
Expand All @@ -30,9 +32,12 @@
import org.agrona.concurrent.UnsafeBuffer;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -45,8 +50,12 @@

import static io.aeron.Aeron.NULL_VALUE;
import static io.aeron.archive.client.AeronArchive.NULL_POSITION;
import static io.aeron.exceptions.AeronException.Category.FATAL;
import static java.lang.Math.max;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.COPY_ATTRIBUTES;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.nio.file.StandardOpenOption.*;
import static org.agrona.BitUtil.*;

Expand All @@ -60,11 +69,25 @@
* that a snapshot is taken midterm and therefore the latest state is the snapshot plus the log of messages which
* got appended to the log after the snapshot was taken.
* <p>
* Record layout as follows:
* Recording log header as follows:
* <pre>
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | 0xFFA3F010 (Magic Number) |
* | 0x00000000 |
* +---------------------------------------------------------------+
* | Version |
* +---------------------------------------------------------------+
* | Reserved (52 bytes) ...
* ... |
* +---------------------------------------------------------------+
* </pre>
* Recording log entry as follows:
* <pre>
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +---------------------------------------------------------------+
* | Recording ID |
* | |
* +---------------------------------------------------------------+
Expand Down Expand Up @@ -100,6 +123,15 @@
*/
public final class RecordingLog implements AutoCloseable
{
static final long MAGIC_NUMBER = 0xFFA3F010_00000000L;
static final int HEADER_SIZE = 64;
static final int MAJOR_VERSION = 0;
static final int MINOR_VERSION = 1;
static final int PATCH_VERSION = 0;
static final int SEMANTIC_VERSION = SemanticVersion.compose(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION);
private static final int MAGIC_NUMBER_OFFSET = 0;
private static final int VERSION_OFFSET = MAGIC_NUMBER_OFFSET + SIZE_OF_LONG;

/**
* Representation of the entry in the {@link RecordingLog}.
*/
Expand Down Expand Up @@ -627,6 +659,9 @@ public String toString()
*/
public static final String RECORDING_LOG_FILE_NAME = "recording.log";

static final String RECORDING_LOG_MIGRATED_FILE_NAME = RECORDING_LOG_FILE_NAME + ".migrated";
static final String RECORDING_LOG_NEW_FILE_NAME = RECORDING_LOG_FILE_NAME + ".new";

/**
* The log entry is for a recording of messages within a leadership term to the log.
*/
Expand Down Expand Up @@ -756,20 +791,31 @@ public RecordingLog(final File parentDir, final boolean createNew)

try
{
if (!isNewFile)
{
checkForVersionAndMigrate(logFile);
}

fileChannel = FileChannel.open(logFile.toPath(), openOptions);

if (isNewFile)
{
syncDirectory(parentDir);
writeHeader(fileChannel);
}
else
{
if (isMagicNumberInvalid(fileChannel))
{
throw new ClusterException("Failed to migrate the recording log, the header is corrupted", FATAL);
}

reload();
}
}
catch (final IOException ex)
{
throw new ClusterException(ex);
throw new ClusterException("Failed to migrate the recording log, I/O error occurred", ex, FATAL);
}
}

Expand Down Expand Up @@ -836,8 +882,8 @@ public void reload()

try
{
long filePosition = 0;
long consumePosition = 0;
long filePosition = HEADER_SIZE;
long consumePosition = filePosition;

while (true)
{
Expand Down Expand Up @@ -1807,6 +1853,73 @@ private int captureEntriesFromBuffer(
return consumed;
}

private void applyHeader(final MutableDirectBuffer header)
{
header.putLong(MAGIC_NUMBER_OFFSET, MAGIC_NUMBER, LITTLE_ENDIAN);
header.putInt(VERSION_OFFSET, SEMANTIC_VERSION, LITTLE_ENDIAN);
}

private void writeHeader(final FileChannel fileChannel) throws IOException
{
final ByteBuffer headerBuffer = ByteBuffer.allocateDirect(HEADER_SIZE);
final MutableDirectBuffer header = new UnsafeBuffer(headerBuffer);
applyHeader(header);

if (HEADER_SIZE != fileChannel.write(headerBuffer))
{
throw new ClusterException("Failed to write full header recording log header", FATAL);
}
}

private void checkForVersionAndMigrate(final File logFile) throws IOException
{
if (requiresMigration(logFile))
{
final File oldMigratedFile = new File(logFile.getParentFile(), RECORDING_LOG_MIGRATED_FILE_NAME);
Files.copy(logFile.toPath(), oldMigratedFile.toPath(), COPY_ATTRIBUTES, REPLACE_EXISTING);

final File newFile = new File(logFile.getParentFile(), RECORDING_LOG_NEW_FILE_NAME);
final Path newPath = newFile.toPath();
Files.deleteIfExists(newPath);

final MutableDirectBuffer header = new UnsafeBuffer(new byte[HEADER_SIZE]);
applyHeader(header);

try (FileOutputStream outputStream = new FileOutputStream(newFile, false))
{
outputStream.write(header.byteArray());
Files.copy(oldMigratedFile.toPath(), outputStream);
}

final Path destinationPath = new File(logFile.getParentFile(), RECORDING_LOG_FILE_NAME).toPath();
Files.move(newPath, destinationPath, ATOMIC_MOVE, REPLACE_EXISTING);
}
}

private boolean requiresMigration(final File logFile) throws IOException
{
if (logFile.length() < HEADER_SIZE)
{
return true;
}

try (FileChannel fileChannel = FileChannel.open(logFile.toPath(), READ))
{
return isMagicNumberInvalid(fileChannel);
}
}

private static boolean isMagicNumberInvalid(final FileChannel fileChannel) throws IOException
{
final DirectBuffer header = new UnsafeBuffer(ByteBuffer.allocateDirect(HEADER_SIZE));
if (HEADER_SIZE != fileChannel.read(header.byteBuffer()))
{
throw new IOException("Unable to read header");
}
final long magicNumber = header.getLong(0, LITTLE_ENDIAN);
return magicNumber != MAGIC_NUMBER;
}

private static void syncDirectory(final File dir)
{
try (FileChannel fileChannel = FileChannel.open(dir.toPath()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ void entriesInTheRecordingLogShouldBeSorted()

recordingLog.reload();

assertEquals(sortedList.size(), recordingLog.entries().size());
assertEquals(sortedList, recordingLog.entries()); // reload from disc and re-sort
}
}
Expand Down
Loading