Skip to content

Commit

Permalink
Complete deserialization logic
Browse files Browse the repository at this point in the history
  • Loading branch information
stoyicker committed Nov 13, 2023
1 parent 9e32c58 commit 273eae2
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@ internal class NtpExchanger(
ntpVersion: UByte,
): NtpExchangeResult? {
val ntpUdpSocketOperations = NtpUdpSocketOperations()
val requestPacket = NtpPacket(
versionNumber = ntpVersion.toInt(),
mode = NTP_MODE_CLIENT,
)
return try {
ntpUdpSocketOperations.prepareSocket(queryTimeout.inWholeMilliseconds)
val ntpPacket = NtpPacket(versionNumber = ntpVersion.toInt(), mode = NTP_MODE_CLIENT)
val requestTime = referenceClock.referenceEpochTime
val buffer = ntpPacketSerializer(requestPacket.copy(transmitEpochTimestamp = requestTime))
ntpPacket.transmitEpochTimestamp = requestTime
val buffer = ntpPacketSerializer(ntpPacket)
ntpUdpSocketOperations.exchangePacketInPlace(
buffer,
address,
NTP_PORT_NUMBER,
)
val responseTime = referenceClock.referenceEpochTime - requestTime
NtpExchangeResult(responseTime, ntpPacketDeserializer(buffer))
ntpPacketDeserializer(buffer)?.let { NtpExchangeResult(responseTime, it) }
} catch (_: Throwable) {
null
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
package com.tidal.networktime.internal

import kotlin.time.Duration
import kotlin.time.Duration.Companion.days

internal data class NtpPacket(
val leapIndicator: Int = 0,
val versionNumber: Int,
val mode: Int,
val stratum: Byte = 0,
val poll: Byte = 0,
val precision: Byte = 0,
val stratum: Int = 0,
val poll: Duration = Duration.INFINITE,
val precision: Duration = Duration.INFINITE,
val rootDelay: Duration = Duration.INFINITE,
val rootDispersion: Duration = Duration.INFINITE,
val referenceIdentifier: Int = 0,
val referenceIdentifier: String = "",
val referenceEpochTimestamp: Duration = Duration.INFINITE,
val originateEpochTimestamp: Duration = Duration.INFINITE,
val receiveEpochTimestamp: Duration = Duration.INFINITE,
val transmitEpochTimestamp: Duration = Duration.INFINITE,
/** Keep this mutable to minimize delay (avoids an allocation) **/
var transmitEpochTimestamp: Duration = Duration.INFINITE,
) {
init {
// Check sizes of fields whose type does not match their corresponding size in the actual packet
check(leapIndicator <= 0b0011)
check(versionNumber <= 0b0111)
check(mode <= 0b0111)
}

companion object {
val NTP_EPOCH_OFFSET_WITH_EPOCH = (365.days * 70 + 17.days)
val NTP_TIMESTAMP_BASE_WITH_EPOCH_MSB_0_MILLISECONDS = 2085978496000
val NTP_TIMESTAMP_BASE_WITH_EPOCH_MSB_1_MILLISECONDS = -2085978496000
}
}
Original file line number Diff line number Diff line change
@@ -1,73 +1,102 @@
package com.tidal.networktime.internal

import kotlin.math.pow
import kotlin.math.roundToLong
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

internal class NtpPacketDeserializer {
operator fun invoke(bytes: ByteArray): NtpPacket {
operator fun invoke(bytes: ByteArray): NtpPacket? {
var index = 0
val leapIndicator = (bytes[index].toInt() shr 6) and 0b11
if (leapIndicator == LEAP_INDICATOR_CLOCK_UNSYNCHRONIZED) {
return null
}
val versionNumber = (bytes[index].toInt() shr 3) and 0b111
val mode = bytes[index].toInt() and 0b111
if (mode != MODE_SERVER) {
return null
}
++index
val stratum = bytes[index++].asUnsignedInt
if (stratum >= STRATUM_CLOCK_NOT_SYNCHRONIZED) {
return null
}
val poll = bytes[index++].asSignedIntToThePowerOf2.seconds
val precision = bytes[index++].asSignedIntToThePowerOf2.milliseconds
val rootDelay = bytes.sliceArray(index until index + 4).asNtpIntervalToInterval
index += 4
val rootDispersion = bytes.sliceArray(index until index + 4).asNtpIntervalToInterval
index += 4
val referenceIdentifier = bytes.sliceArray(index until index + 4).decodeToString()
index += 4
val reference = bytes.sliceArray(index until index + 8).asNtpEpochTimestampToEpochTime
index += 8
val originate = bytes.sliceArray(index until index + 8).asNtpEpochTimestampToEpochTime
index += 8
val receive = bytes.sliceArray(index until index + 8).asNtpEpochTimestampToEpochTime
index += 8
val transmit = bytes.sliceArray(index until index + 8).asNtpEpochTimestampToEpochTime
return NtpPacket(
(bytes[index++].toInt() shl 8) + bytes[index++],
(bytes[index++].toInt() shl 16) + (bytes[index++].toInt() shl 24) + bytes[index++],
(bytes[index++].toInt() shl 16) + (bytes[index++].toInt() shl 24) + bytes[index++],
bytes[index++],
bytes[index++],
bytes[index++],
bytes.sliceArray(index until index + 32).asNtpIntervalToInterval.also { index += 32 },
bytes.sliceArray(index until index + 32).asNtpIntervalToInterval.also { index += 32 },
(bytes[index++].toInt() shl 24) +
(bytes[index++].toInt() shl 16) +
(bytes[index++].toInt() shl 8) +
bytes[index++].toInt(),
bytes.sliceArray(index until index + 64).asNtpEpochTimestampToEpochTime
.also { index += 64 },
bytes.sliceArray(index until index + 64).asNtpEpochTimestampToEpochTime
.also { index += 64 },
bytes.sliceArray(index until index + 64).asNtpEpochTimestampToEpochTime
.also { index += 64 },
bytes.sliceArray(index until index + 64).asNtpEpochTimestampToEpochTime,
leapIndicator,
versionNumber,
mode,
stratum,
poll,
precision,
rootDelay,
rootDispersion,
referenceIdentifier,
reference,
originate,
receive,
transmit,
)
}

private val Byte.asSignedIntToThePowerOf2
get() = 2.toDouble().pow(toInt())

private val Byte.asUnsignedInt: Int
get() = toUByte().toInt()

private val ByteArray.asNtpIntervalToInterval: Duration
get() {
var index = 0
val seconds = (this[index++].toUByte().toInt() shl 8) +
this[index++].toUByte().toInt()
val fraction = (
(this[index++].toUByte().toInt() shl 8) +
this[index].toUByte().toInt()
) *
1_000 /
(1 shl 16)
val seconds = (this[index++].asUnsignedInt shl 8) + this[index++].asUnsignedInt
val fraction = ((this[index++].asUnsignedInt shl 8) + this[index].asUnsignedInt)
.toDouble() / (1 shl 16) * 1_000
return seconds.seconds + fraction.milliseconds
}

private val Byte.asUnsignedLong: Long
get() = toUByte().toLong()

private val ByteArray.asNtpEpochTimestampToEpochTime: Duration
get() {
val rollOverAdjustment = if ((this[0].toInt() shr 7) == 0) {
2.toDouble().pow(32).seconds
} else {
Duration.ZERO
}
var index = 0
val seconds = (this[index++].toUByte().toInt() shl 24) +
(this[index++].toUByte().toInt() shl 16) +
(this[index++].toUByte().toInt() shl 8) +
this[index++].toUByte().toInt()
val fraction = (
(this[index++].toUByte().toInt() shl 24) +
(this[index++].toUByte().toInt() shl 16) +
(this[index++].toUByte().toInt() shl 8) +
this[index].toUByte().toInt()
) *
1_000 /
0b100000000000000000000000000000000
return seconds.seconds +
rollOverAdjustment -
NtpPacket.NTP_EPOCH_OFFSET_WITH_EPOCH +
fraction.milliseconds
val value = (this[index++].asUnsignedLong shl 56) or
(this[index++].asUnsignedLong shl 48) or
(this[index++].asUnsignedLong shl 40) or
(this[index++].asUnsignedLong shl 32) or
(this[index++].asUnsignedLong shl 24) or
(this[index++].asUnsignedLong shl 16) or
(this[index++].asUnsignedLong shl 8) or
this[index].asUnsignedLong
val seconds = ((value ushr 32) and 0xffffffff)
val milliseconds =
((value and 0xffffffff).toDouble() / 0x100000000 * 1000).roundToLong()
return if (seconds and 0x80000000L == 0L) {
NtpPacket.NTP_TIMESTAMP_BASE_WITH_EPOCH_MSB_0_MILLISECONDS
} else {
NtpPacket.NTP_TIMESTAMP_BASE_WITH_EPOCH_MSB_1_MILLISECONDS
}.milliseconds + seconds.seconds + milliseconds.milliseconds
}

companion object {
private const val LEAP_INDICATOR_CLOCK_UNSYNCHRONIZED = 0b11
private const val MODE_SERVER = 4
private const val STRATUM_CLOCK_NOT_SYNCHRONIZED = 16
}
}
Original file line number Diff line number Diff line change
@@ -1,61 +1,43 @@
package com.tidal.networktime.internal

import kotlin.random.Random
import kotlin.time.Duration

internal class NtpPacketSerializer(private val random: Random) {
operator fun invoke(ntpPacket: NtpPacket) = with(ntpPacket) {
byteArrayOf(
((leapIndicator shl 6) or (versionNumber shl 3) or mode).toByte(),
stratum,
poll,
precision,
*rootDelay.asIntervalToNtpInterval,
*rootDispersion.asIntervalToNtpInterval,
(referenceIdentifier shr 24).toByte(),
(referenceIdentifier shr 16).toByte(),
(referenceIdentifier shr 8).toByte(),
referenceIdentifier.toByte(),
*referenceEpochTimestamp.asEpochTimestampToNtpEpochTimestamp,
*originateEpochTimestamp.asEpochTimestampToNtpEpochTimestamp,
*receiveEpochTimestamp.asEpochTimestampToNtpEpochTimestamp,
*transmitEpochTimestamp.asEpochTimestampToNtpEpochTimestamp,
)
}

private val Duration.asIntervalToNtpInterval: ByteArray
get() {
if (this == Duration.INFINITE) {
return ByteArray(4)
internal class NtpPacketSerializer {
operator fun invoke(ntpPacket: NtpPacket) = ntpPacket.run {
ByteArray(48).apply {
set(0, ((0 shl 6) or (versionNumber shl 3) or mode).toByte())
transmitEpochTimestamp.asNtpByteArray.forEachIndexed { i, it ->
set(40 + i, it)
}
val wholeSeconds = inWholeSeconds
val fraction = (inWholeMilliseconds - wholeSeconds * 1_000) * (1 shl 16) / 1_000
return byteArrayOf(
(wholeSeconds shr 8).toByte(),
wholeSeconds.toByte(),
(fraction shr 8).toByte(),
fraction.toByte(),
)
}
}

private val Duration.asEpochTimestampToNtpEpochTimestamp: ByteArray
private val Duration.asNtpByteArray: ByteArray
get() {
if (this == Duration.INFINITE) {
return ByteArray(8)
val millis = inWholeMilliseconds
val useBase1 = millis < NtpPacket.NTP_TIMESTAMP_BASE_WITH_EPOCH_MSB_0_MILLISECONDS
val baseTimeMillis = millis -
if (useBase1) {
NtpPacket.NTP_TIMESTAMP_BASE_WITH_EPOCH_MSB_1_MILLISECONDS
} else {
NtpPacket.NTP_TIMESTAMP_BASE_WITH_EPOCH_MSB_0_MILLISECONDS
}
var seconds = baseTimeMillis / 1_000
if (useBase1) {
seconds = seconds or 0x80000000L
}
val fraction = baseTimeMillis % 1_000 * 0x100000000L / 1_000
return (seconds shl 32 or fraction).run {
byteArrayOf(
(this shr 56 and 0xff).toByte(),
(this shr 48 and 0xff).toByte(),
(this shr 40 and 0xff).toByte(),
(this shr 32 and 0xff).toByte(),
(this shr 24 and 0xff).toByte(),
(this shr 16 and 0xff).toByte(),
(this shr 8 and 0xff).toByte(),
(this and 0xff).toByte(),
)
}
val seconds = (this + NtpPacket.NTP_EPOCH_OFFSET_WITH_EPOCH).inWholeSeconds
val fraction = (inWholeMilliseconds - inWholeSeconds * 1_000) *
0b100000000000000000000000000000000 /
1_000
return byteArrayOf(
(seconds shr 24).toByte(),
(seconds shr 16).toByte(),
(seconds shr 8).toByte(),
seconds.toByte(),
(fraction shr 24).toByte(),
(fraction shr 16).toByte(),
(fraction shr 8).toByte(),
random.nextBytes(1).single(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ package com.tidal.networktime.internal

import com.tidal.networktime.NTPServer
import kotlinx.coroutines.delay
import kotlin.random.Random
import kotlin.time.Duration

internal class SyncPeriodic(
private val ntpServers: Iterable<NTPServer>,
private val syncInterval: Duration,
private val referenceClock: KotlinXDateTimeSystemClock,
private val mutableState: MutableState,
random: Random = Random.Default,
private val ntpExchanger: NtpExchanger = NtpExchanger(
referenceClock,
NtpPacketSerializer(random),
NtpPacketSerializer(),
NtpPacketDeserializer(),
),
) {
Expand Down

0 comments on commit 273eae2

Please sign in to comment.