Skip to content

Commit

Permalink
refactor(android): guard executor submit with try-catch blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
abhaysood committed Aug 1, 2024
1 parent 5e30f59 commit cfaa82a
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ internal class MeasureInitializerImpl(
executorServiceRegistry.ioExecutor(),
),
override val userAttributeProcessor: UserAttributeProcessor = UserAttributeProcessor(
logger,
prefsStorage,
executorServiceRegistry.ioExecutor(),
),
Expand Down Expand Up @@ -222,6 +223,7 @@ internal class MeasureInitializerImpl(
batchCreator = batchCreator,
),
private val exceptionExporter: ExceptionExporter = ExceptionExporterImpl(
logger = logger,
exportExecutor = executorServiceRegistry.eventExportExecutor(),
eventExporter = eventExporter,
),
Expand Down Expand Up @@ -279,6 +281,7 @@ internal class MeasureInitializerImpl(
systemServiceProvider = systemServiceProvider,
),
override val appExitCollector: AppExitCollector = AppExitCollector(
logger = logger,
appExitProvider = appExitProvider,
ioExecutor = executorServiceRegistry.ioExecutor(),
eventProcessor = eventProcessor,
Expand All @@ -294,6 +297,7 @@ internal class MeasureInitializerImpl(
defaultExecutor = executorServiceRegistry.defaultExecutor(),
),
override val memoryUsageCollector: MemoryUsageCollector = MemoryUsageCollector(
logger = logger,
eventProcessor = eventProcessor,
timeProvider = timeProvider,
defaultExecutor = executorServiceRegistry.defaultExecutor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import sh.measure.android.SessionManager
import sh.measure.android.events.EventProcessor
import sh.measure.android.events.EventType
import sh.measure.android.executors.MeasureExecutorService
import sh.measure.android.logger.LogLevel
import sh.measure.android.logger.Logger
import java.util.concurrent.RejectedExecutionException

internal class AppExitCollector(
private val logger: Logger,
private val appExitProvider: AppExitProvider,
private val ioExecutor: MeasureExecutorService,
private val eventProcessor: EventProcessor,
Expand All @@ -17,25 +21,29 @@ internal class AppExitCollector(
}

private fun trackAppExit() {
ioExecutor.submit {
val appExits = appExitProvider.get()
if (appExits.isNullOrEmpty()) {
return@submit emptyList<Triple<Int, String, AppExit>>()
}
val pidsToSessionsMap: Map<Int, List<String>> = sessionManager.getSessionsWithUntrackedAppExit()
val appExitsToTrack = mapAppExitsToSession(pidsToSessionsMap, appExits)
markSessionsAsCrashedByAppExitReason(appExitsToTrack)
appExitsToTrack.forEach {
eventProcessor.track(
data = it.third,
// For app exit, the time at which the app exited is more relevant
// than the current time.
timestamp = it.third.app_exit_time_ms,
type = EventType.APP_EXIT,
sessionId = it.second,
)
sessionManager.updateAppExitTracked(pid = it.first)
try {
ioExecutor.submit {
val appExits = appExitProvider.get()
if (appExits.isNullOrEmpty()) {
return@submit emptyList<Triple<Int, String, AppExit>>()
}
val pidsToSessionsMap: Map<Int, List<String>> = sessionManager.getSessionsWithUntrackedAppExit()
val appExitsToTrack = mapAppExitsToSession(pidsToSessionsMap, appExits)
markSessionsAsCrashedByAppExitReason(appExitsToTrack)
appExitsToTrack.forEach {
eventProcessor.track(
data = it.third,
// For app exit, the time at which the app exited is more relevant
// than the current time.
timestamp = it.third.app_exit_time_ms,
type = EventType.APP_EXIT,
sessionId = it.second,
)
sessionManager.updateAppExitTracked(pid = it.first)
}
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit app exit tracking task to executor", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package sh.measure.android.attributes

import sh.measure.android.attributes.Attribute.USER_ID_KEY
import sh.measure.android.executors.MeasureExecutorService
import sh.measure.android.logger.LogLevel
import sh.measure.android.logger.Logger
import sh.measure.android.storage.PrefsStorage
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.atomic.AtomicBoolean

/**
* Maintains the state for the user ID attribute. The user ID is set by the SDK user and can change
* during the session. This class returns the latest user ID set by the user.
*/
internal class UserAttributeProcessor(
private val logger: Logger,
private val prefsStorage: PrefsStorage,
private val ioExecutor: MeasureExecutorService,
) : AttributeProcessor {
Expand All @@ -34,8 +38,12 @@ internal class UserAttributeProcessor(

fun clearUserId() {
userId = null
ioExecutor.submit {
prefsStorage.setUserId(null)
try {
ioExecutor.submit {
prefsStorage.setUserId(null)
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit clear user id task to executor", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import sh.measure.android.logger.Logger
import sh.measure.android.storage.Database
import sh.measure.android.utils.isLowerCase
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.atomic.AtomicBoolean

internal interface UserDefinedAttribute {
Expand Down Expand Up @@ -44,8 +45,12 @@ internal class UserDefinedAttributeImpl(
if (validate(key, value)) {
attributes[key] = value
if (store) {
ioExecutor.submit {
database.insertUserDefinedAttribute(key, value)
try {
ioExecutor.submit {
database.insertUserDefinedAttribute(key, value)
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit insert user defined attribute task to executor", e)
}
}
}
Expand All @@ -55,8 +60,12 @@ internal class UserDefinedAttributeImpl(
if (validate(key, value)) {
attributes[key] = value
if (store) {
ioExecutor.submit {
database.insertUserDefinedAttribute(key, value)
try {
ioExecutor.submit {
database.insertUserDefinedAttribute(key, value)
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit insert user defined attribute task to executor", e)
}
}
}
Expand All @@ -81,13 +90,19 @@ internal class UserDefinedAttributeImpl(
LogLevel.Warning,
"Unable to remove attribute: $key, as it does not exist",
)
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit remove user defined attribute task to executor", e)
}
}

override fun clear() {
attributes.clear()
ioExecutor.submit {
database.clearUserDefinedAttributes()
try {
ioExecutor.submit {
database.clearUserDefinedAttributes()
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit clear user defined attributes task to executor", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import sh.measure.android.storage.EventStore
import sh.measure.android.tracing.InternalTrace
import sh.measure.android.utils.IdProvider
import sh.measure.android.utils.iso8601Timestamp
import java.util.concurrent.RejectedExecutionException

/**
* An interface for processing events. It is responsible for tracking events, processing them
Expand Down Expand Up @@ -177,30 +178,37 @@ internal class EventProcessorImpl(
label = { "msr-track-event" },
block = {
val threadName = Thread.currentThread().name
defaultExecutor.submit {
val event = createEvent(
data = data,
timestamp = timestamp,
type = type,
attachments = attachments,
attributes = attributes,
userTriggered = userTriggered,
sessionId = sessionId,
)
applyAttributes(event, threadName)
val transformedEvent = InternalTrace.trace(
label = { "msr-transform-event" },
block = { eventTransformer.transform(event) },
)

if (transformedEvent != null) {
InternalTrace.trace(label = { "msr-store-event" }, block = {
eventStore.store(event)
logger.log(LogLevel.Debug, "Event processed: $type, ${event.sessionId}")
})
} else {
logger.log(LogLevel.Debug, "Event dropped: $type")
try {
defaultExecutor.submit {
val event = createEvent(
data = data,
timestamp = timestamp,
type = type,
attachments = attachments,
attributes = attributes,
userTriggered = userTriggered,
sessionId = sessionId,
)
applyAttributes(event, threadName)
val transformedEvent = InternalTrace.trace(
label = { "msr-transform-event" },
block = { eventTransformer.transform(event) },
)

if (transformedEvent != null) {
InternalTrace.trace(label = { "msr-store-event" }, block = {
eventStore.store(event)
logger.log(
LogLevel.Debug,
"Event processed: $type, ${event.sessionId}",
)
})
} else {
logger.log(LogLevel.Debug, "Event dropped: $type")
}
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit event processing task to executor", e)
}
},
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sh.measure.android.exporter

import sh.measure.android.executors.MeasureExecutorService
import sh.measure.android.logger.LogLevel
import sh.measure.android.logger.Logger
import java.util.concurrent.RejectedExecutionException

/**
* An interface which allows exporting events to server when an exception occurs.
Expand All @@ -16,14 +19,19 @@ internal interface ExceptionExporter {
* not been exported yet, including the exception.
*/
internal class ExceptionExporterImpl(
private val logger: Logger,
private val eventExporter: EventExporter,
private val exportExecutor: MeasureExecutorService,
) : ExceptionExporter {
override fun export(sessionId: String) {
exportExecutor.submit {
eventExporter.createBatch(sessionId)?.let {
eventExporter.export(it.batchId, it.eventIds)
try {
exportExecutor.submit {
eventExporter.createBatch(sessionId)?.let {
eventExporter.export(it.batchId, it.eventIds)
}
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit exception export task to executor", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ internal class HeartbeatImpl(
if (future != null) {
return
}
try {
future = scheduler.scheduleAtFixedRate(
future = try {
scheduler.scheduleAtFixedRate(
{
listeners.forEach(HeartbeatListener::pulse)
},
Expand All @@ -50,6 +50,7 @@ internal class HeartbeatImpl(
)
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to start ExportHeartbeat", e)
return
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import sh.measure.android.executors.MeasureExecutorService
import sh.measure.android.logger.LogLevel
import sh.measure.android.logger.Logger
import sh.measure.android.utils.TimeProvider
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.atomic.AtomicBoolean

internal interface PeriodicEventExporter {
Expand Down Expand Up @@ -62,12 +63,17 @@ internal class PeriodicEventExporterImpl(
return
}

exportExecutor.submit {
try {
processBatches()
} finally {
isExportInProgress.set(false)
try {
exportExecutor.submit {
try {
processBatches()
} finally {
isExportInProgress.set(false)
}
}
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to submit export task to executor", e)
isExportInProgress.set(false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import sh.measure.android.utils.ProcProviderImpl
import sh.measure.android.utils.ProcessInfoProvider
import sh.measure.android.utils.TimeProvider
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit

internal const val CPU_TRACKING_INTERVAL_MS = 3000L
Expand All @@ -36,14 +37,19 @@ internal class CpuUsageCollector(
fun register() {
if (!processInfo.isForegroundProcess()) return
if (future != null) return
future = defaultExecutor.scheduleAtFixedRate(
{
trackCpuUsage()
},
0,
CPU_TRACKING_INTERVAL_MS,
TimeUnit.MILLISECONDS,
)
future = try {
defaultExecutor.scheduleAtFixedRate(
{
trackCpuUsage()
},
0,
CPU_TRACKING_INTERVAL_MS,
TimeUnit.MILLISECONDS,
)
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to start CpuUsageCollector", e)
null
}
}

fun resume() {
Expand Down
Loading

0 comments on commit cfaa82a

Please sign in to comment.