Skip to content

Commit

Permalink
feat: Improved filtering for platform players with multiple states
Browse files Browse the repository at this point in the history
* Add session key dervied from source data to Play and PlayerState
* Discriminate states based on a session key, if possible
  * Log which states are being dropped based on session key (plex)
* Add session key to important logging (new player play, in UI for player info)
  • Loading branch information
FoxxMD committed Oct 29, 2024
1 parent 93125dd commit 3ed6840
Show file tree
Hide file tree
Showing 17 changed files with 120 additions and 74 deletions.
8 changes: 4 additions & 4 deletions docsite/docs/configuration/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ Find your [**Plex Token**](https://support.plex.tv/articles/204059436-finding-an

:::warning[Deprecated]

Multi-scrobbler < 0.9.0 used [webhooks](https://support.plex.tv/articles/115002267687-webhooks) to support Plex scrobbling. This approach has been deprecated in favor of using Plex's API directly which has many benefits including **not requiring Plex Pass.**
Multi-scrobbler < 0.8.7 used [webhooks](https://support.plex.tv/articles/115002267687-webhooks) to support Plex scrobbling. This approach has been deprecated in favor of using Plex's API directly which has many benefits including **not requiring Plex Pass.**

:::

Expand All @@ -312,9 +312,9 @@ Multi-scrobbler < 0.9.0 used [webhooks](https://support.plex.tv/articles/1150022

* Follow the instructions in the API tab
* The `user` (`PLEX_USER`) setting has been renamed `usersAllow` (`PLEX_USERS_ALLOW`)
* If you were using this filter to ensure only scrobbles from yourself were registered then you no longer need this setting -- by default MS will only scrobble for the user the Plex Token is used for.
* The `servers` setting is no longer available as MS only scrobbles from the server using the API anyways.
* If you need to scrobble for multiple servers set up each server as a separate Plex API source
* If you were using this filter to ensure only scrobbles from yourself were registered then you no longer need this setting -- by default MS will only scrobble for the user the Plex Token is from.
* The `servers` setting is no longer available as MS only scrobbles from the server the Plex token is from.
* If you need to scrobble for multiple servers set up each server as a separate Plex API source with a separate token.
* The `libraries` setting has been renamed to `librariesAllow`

</details>
Expand Down
2 changes: 2 additions & 0 deletions src/backend/common/infrastructure/Atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ export interface PlayerStateData extends PlayerStateDataMaybePlay {

export interface PlayerStateDataMaybePlay {
platformId: PlayPlatformId
/** The ID/Key for individual sessions on a device/platform */
sessionId?: string
play?: PlayObject
status?: ReportedPlayerStatus
position?: number
Expand Down
3 changes: 1 addition & 2 deletions src/backend/common/vendor/ListenbrainzApiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import { stringSameness } from '@foxxmd/string-sameness';
import dayjs from "dayjs";
import request, { Request, Response } from 'superagent';
import { PlayObject } from "../../../core/Atomic.js";
import { slice } from "../../../core/StringUtils.js";
import { combinePartsToString } from "../../utils.js";
import { combinePartsToString, slice } from "../../../core/StringUtils.js";
import {
findDelimiters,
normalizeStr,
Expand Down
4 changes: 2 additions & 2 deletions src/backend/sources/JellyfinApiSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import { nanoid } from "nanoid";
import pEvent from "p-event";
import { Simulate } from "react-dom/test-utils";
import { PlayObject } from "../../core/Atomic.js";
import { buildTrackString, truncateStringToLength } from "../../core/StringUtils.js";
import { buildTrackString, combinePartsToString, truncateStringToLength } from "../../core/StringUtils.js";
import {
FormatPlayObjectOptions,
InternalConfig,
Expand All @@ -50,7 +50,7 @@ import {
PlayPlatformId, REPORTED_PLAYER_STATUSES
} from "../common/infrastructure/Atomic.js";
import { JellyApiSourceConfig } from "../common/infrastructure/config/source/jellyfin.js";
import { combinePartsToString, genGroupIdStr, getPlatformIdFromData, joinedUrl, parseBool, } from "../utils.js";
import { genGroupIdStr, getPlatformIdFromData, joinedUrl, parseBool, } from "../utils.js";
import { parseArrayFromMaybeString } from "../utils/StringUtils.js";
import { MemoryPositionalSource } from "./MemoryPositionalSource.js";
import { FixedSizeList } from "fixed-size-list";
Expand Down
8 changes: 6 additions & 2 deletions src/backend/sources/JellyfinSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ import { Logger } from "@foxxmd/logging";
import dayjs from "dayjs";
import EventEmitter from "events";
import { PlayObject, TA_CLOSE } from "../../core/Atomic.js";
import { buildTrackString, splitByFirstFound, truncateStringToLength } from "../../core/StringUtils.js";
import {
buildTrackString,
combinePartsToString,
splitByFirstFound,
truncateStringToLength
} from "../../core/StringUtils.js";
import { FormatPlayObjectOptions, InternalConfig, PlayPlatformId } from "../common/infrastructure/Atomic.js";
import { JellySourceConfig } from "../common/infrastructure/config/source/jellyfin.js";
import {
combinePartsToString,
doubleReturnNewline,
parseBool,
parseDurationFromTimestamp,
Expand Down
3 changes: 2 additions & 1 deletion src/backend/sources/MemoryPositionalSource.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Logger } from "@foxxmd/logging";
import { PlayPlatformId } from "../common/infrastructure/Atomic.js";
import { PlayerStateDataMaybePlay, PlayPlatformId } from "../common/infrastructure/Atomic.js";
import MemorySource from "./MemorySource.js";
import { PlayerStateOptions } from "./PlayerState/AbstractPlayerState.js";
import { PositionalPlayerState } from "./PlayerState/PositionalPlayerState.js";
import { PlayObject } from "../../core/Atomic.js";

export class MemoryPositionalSource extends MemorySource {
getNewPlayer = (logger: Logger, id: PlayPlatformId, opts: PlayerStateOptions) => new PositionalPlayerState(logger, id, opts)
Expand Down
12 changes: 8 additions & 4 deletions src/backend/sources/MemorySource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ export default class MemorySource extends AbstractSource {
this.emitEvent('playerDelete', {platformId: id});
}

pickPlatformSession = (sessions: (PlayObject | PlayerStateDataMaybePlay)[], player: AbstractPlayerState): PlayObject | PlayerStateDataMaybePlay => {
if(sessions.length > 1) {
player.logger.debug(`More than one data/state found in incoming data, will only use first found.`);
}
return sessions[0];
}

processRecentPlays = (datas: (PlayObject | PlayerStateDataMaybePlay)[]) => {

const {
Expand Down Expand Up @@ -162,10 +169,7 @@ export default class MemorySource extends AbstractSource {
if (relevantDatas.length > 0) {
this.lastActivityAt = dayjs();

if (relevantDatas.length > 1) {
this.logger.warn(`More than one data/state for Player ${player.platformIdStr} found in incoming data, will only use first found.`);
}
incomingData = relevantDatas[0];
incomingData = this.pickPlatformSession(relevantDatas, player);

let playerState: PlayerStateDataMaybePlay;
if(asPlayerStateDataMaybePlay(incomingData)) {
Expand Down
8 changes: 5 additions & 3 deletions src/backend/sources/PlayerState/AbstractPlayerState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export abstract class AbstractPlayerState {
reportedStatus: ReportedPlayerStatus = REPORTED_PLAYER_STATUSES.unknown
calculatedStatus: CalculatedPlayerStatus = CALCULATED_PLAYER_STATUSES.unknown
platformId: PlayPlatformId
sessionId?: string
stateIntervalOptions: Required<PlayerStateIntervals>;
currentPlay?: PlayObject
playFirstSeenAt?: Dayjs
Expand Down Expand Up @@ -151,11 +152,12 @@ export abstract class AbstractPlayerState {
}

protected setPlay(state: PlayerStateData, reportedTS?: Dayjs): [PlayObject, PlayObject?] {
const {play, status} = state;
const {play, status, sessionId} = state;
this.playLastUpdatedAt = dayjs();
if (status !== undefined) {
this.reportedStatus = status;
}
this.sessionId = sessionId;

if (this.currentPlay !== undefined) {
if (!this.incomingPlayMatchesExisting(play)) { // TODO check new play date and listen range to see if they intersect
Expand Down Expand Up @@ -329,7 +331,7 @@ export abstract class AbstractPlayerState {
this.listenRanges = [];
this.currentListenRange = undefined;

this.logger.debug(`New Play: ${buildTrackString(play, {include: ['trackId', 'artist', 'track']})}`);
this.logger.verbose(`New Play: ${buildTrackString(play, {include: ['trackId', 'artist', 'track', 'session']})}`);

if (status !== undefined) {
this.reportedStatus = status;
Expand All @@ -344,7 +346,7 @@ export abstract class AbstractPlayerState {
const parts = [''];
let play: string;
if (this.currentPlay !== undefined) {
parts.push(`${buildTrackString(this.currentPlay, {include: ['trackId', 'artist', 'track']})} @ ${this.playFirstSeenAt.toISOString()}`);
parts.push(`${buildTrackString(this.currentPlay, {include: ['trackId', 'artist', 'track', 'session']})} @ ${this.playFirstSeenAt.toISOString()}`);
}
parts.push(`Reported: ${this.reportedStatus.toUpperCase()} | Calculated: ${this.calculatedStatus.toUpperCase()} | Stale: ${this.isUpdateStale() ? 'Yes' : 'No'} | Orphaned: ${this.isOrphaned() ? 'Yes' : 'No'} | Last Update: ${this.stateLastUpdatedAt.toISOString()}`);
let progress = '';
Expand Down
43 changes: 32 additions & 11 deletions src/backend/sources/PlexApiSource.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import objectHash from 'object-hash';
import EventEmitter from "events";
import { PlayObject } from "../../core/Atomic.js";
import { buildTrackString, truncateStringToLength } from "../../core/StringUtils.js";
import { buildTrackString, combinePartsToString, truncateStringToLength } from "../../core/StringUtils.js";
import {
asPlayerStateDataMaybePlay,
FormatPlayObjectOptions,
InternalConfig,
PlayerStateData,
PlayerStateDataMaybePlay,
PlayPlatformId, REPORTED_PLAYER_STATUSES
} from "../common/infrastructure/Atomic.js";
import { combinePartsToString, genGroupIdStr, getFirstNonEmptyString, getPlatformIdFromData, joinedUrl, parseBool, } from "../utils.js";
import { parseArrayFromMaybeString } from "../utils/StringUtils.js";
import { genGroupIdStr, getFirstNonEmptyString, getPlatformIdFromData, joinedUrl, parseBool, } from "../utils.js";
import { buildStatePlayerPlayIdententifyingInfo, parseArrayFromMaybeString } from "../utils/StringUtils.js";
import { GetSessionsMetadata } from "@lukehagar/plexjs/sdk/models/operations/getsessions.js";
import { PlexAPI } from "@lukehagar/plexjs";
import {
Expand All @@ -23,7 +24,7 @@ import { GetTokenDetailsResponse, GetTokenDetailsUserPlexAccount } from '@lukeha
import { parseRegexSingle } from '@foxxmd/regex-buddy-core';
import { Readable } from 'node:stream';
import { PlexPlayerState } from './PlayerState/PlexPlayerState.js';
import { PlayerStateOptions } from './PlayerState/AbstractPlayerState.js';
import { AbstractPlayerState, PlayerStateOptions } from './PlayerState/AbstractPlayerState.js';
import { Logger } from '@foxxmd/logging';
import { MemoryPositionalSource } from './MemoryPositionalSource.js';
import { FixedSizeList } from 'fixed-size-list';
Expand Down Expand Up @@ -291,6 +292,7 @@ export default class PlexApiSource extends MemoryPositionalSource {
librarySectionTitle: library,
duration,
guid,
sessionKey,
player: {
product,
title: playerTitle,
Expand Down Expand Up @@ -320,6 +322,7 @@ export default class PlexApiSource extends MemoryPositionalSource {
source: 'Plex',
library,
deviceId: combinePartsToString([shortDeviceId(machineIdentifier), product, playerTitle]),
sessionId: sessionKey,
trackProgressPosition: viewOffset / 1000
}
}
Expand All @@ -329,19 +332,16 @@ export default class PlexApiSource extends MemoryPositionalSource {

const result = await this.plexApi.sessions.getSessions();

const nonMSSessions: [PlayerStateDataMaybePlay, GetSessionsMetadata][] = (result.object.mediaContainer?.metadata ?? [])
const allSessions: [PlayerStateDataMaybePlay, GetSessionsMetadata][] = (result.object.mediaContainer?.metadata ?? [])
.map(x => [this.sessionToPlayerState(x), x]);
const validSessions: PlayerStateDataMaybePlay[] = [];

for(const sessionData of nonMSSessions) {
for(const sessionData of allSessions) {
const validPlay = this.isActivityValid(sessionData[0], sessionData[1]);
if(validPlay === true) {
validSessions.push(sessionData[0]);
} else if(this.logFilterFailure !== false) {
let stateIdentifyingInfo: string = genGroupIdStr(getPlatformIdFromData(sessionData[0]));
if(sessionData[0].play !== undefined) {
stateIdentifyingInfo = buildTrackString(sessionData[0].play, {include: ['artist', 'track', 'platform']});
}
const stateIdentifyingInfo = buildStatePlayerPlayIdententifyingInfo(sessionData[0]);
const dropReason = `Player State for -> ${stateIdentifyingInfo} <-- is being dropped because ${validPlay}`;
if(!this.uniqueDropReasons.data.some(x => x === dropReason)) {
this.logger[this.logFilterFailure](dropReason);
Expand Down Expand Up @@ -370,6 +370,25 @@ export default class PlexApiSource extends MemoryPositionalSource {
}
}

pickPlatformSession = (sessions: (PlayObject | PlayerStateDataMaybePlay)[], player: AbstractPlayerState): PlayObject | PlayerStateDataMaybePlay => {
if(sessions.length === 1) {
return sessions[0];
}
// if all are player states and have session ids
// then choose the player state with the "latest" session key
if(sessions.every(x => asPlayerStateDataMaybePlay(x) && 'sessionId' in x)) {
const pStateSessions = sessions as PlayerStateDataMaybePlay[];
pStateSessions.sort((a, b) => parseInt(a.sessionId) - parseInt(b.sessionId));

const validSession = pStateSessions[sessions.length - 1];
const droppingSessions = pStateSessions.filter(x => x.sessionId !== validSession.sessionId).map(x => buildStatePlayerPlayIdententifyingInfo(x)).join('\n');
player.logger.debug(`More than one data/state found in incoming data, dropping these sessions with "earlier" session keys:\n${droppingSessions}`);

return validSession;
}
return sessions[0];
}

sessionToPlayerState = (obj: GetSessionsMetadata): PlayerStateDataMaybePlay => {

const {
Expand All @@ -379,7 +398,8 @@ export default class PlexApiSource extends MemoryPositionalSource {
product,
title,
state
} = {}
} = {},
sessionKey
} = obj;

const msDeviceId = combinePartsToString([shortDeviceId(machineIdentifier), product, title]);
Expand All @@ -394,6 +414,7 @@ export default class PlexApiSource extends MemoryPositionalSource {
const reportedStatus = state !== 'playing' ? REPORTED_PLAYER_STATUSES.paused : REPORTED_PLAYER_STATUSES.playing;
return {
platformId: [msDeviceId, play.meta.user],
sessionId: sessionKey,
play,
status: reportedStatus,
position: viewOffset / 1000
Expand Down
3 changes: 1 addition & 2 deletions src/backend/sources/PlexSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import EventEmitter from "events";
import formidable, { Files, File } from 'formidable';
import { file } from "jscodeshift";
import { PlayObject } from "../../core/Atomic.js";
import { truncateStringToLength } from "../../core/StringUtils.js";
import { combinePartsToString, truncateStringToLength } from "../../core/StringUtils.js";
import { FormatPlayObjectOptions, InternalConfig, SourceType } from "../common/infrastructure/Atomic.js";
import { PlexSourceConfig } from "../common/infrastructure/config/source/plex.js";
import { combinePartsToString } from "../utils.js";
import { getFileIdentifier, getValidMultipartJsonFile } from "../utils/RequestUtils.js";
import AbstractSource from "./AbstractSource.js";

Expand Down
3 changes: 1 addition & 2 deletions src/backend/sources/SpotifySource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import EventEmitter from "events";
import SpotifyWebApi from "spotify-web-api-node";
import request from 'superagent';
import { PlayObject, SCROBBLE_TS_SOC_END, SCROBBLE_TS_SOC_START, ScrobbleTsSOC } from "../../core/Atomic.js";
import { truncateStringToLength } from "../../core/StringUtils.js";
import { combinePartsToString, truncateStringToLength } from "../../core/StringUtils.js";
import { isNodeNetworkException } from "../common/errors/NodeErrors.js";
import { hasUpstreamError, UpstreamError } from "../common/errors/UpstreamError.js";
import {
Expand All @@ -18,7 +18,6 @@ import {
} from "../common/infrastructure/Atomic.js";
import { SpotifySourceConfig } from "../common/infrastructure/config/source/spotify.js";
import {
combinePartsToString,
joinedUrl,
parseRetryAfterSecsFromObj,
readJson,
Expand Down
3 changes: 1 addition & 2 deletions src/backend/sources/TautulliSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ import dayjs from "dayjs";
import EventEmitter from "events";
import { Request } from "express";
import { PlayObject } from "../../core/Atomic.js";
import { truncateStringToLength } from "../../core/StringUtils.js";
import { combinePartsToString, truncateStringToLength } from "../../core/StringUtils.js";
import { FormatPlayObjectOptions, InternalConfig } from "../common/infrastructure/Atomic.js";
import { TautulliSourceConfig } from "../common/infrastructure/config/source/tautulli.js";
import { combinePartsToString } from "../utils.js";
import PlexSource from "./PlexSource.js";

const shortDeviceId = truncateStringToLength(10, '');
Expand Down
32 changes: 1 addition & 31 deletions src/backend/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ import pathUtil from "path";
import { TimeoutError, WebapiError } from "spotify-web-api-node/src/response-error.js";
import { PlayObject } from "../core/Atomic.js";
import {
asPlayerStateData,
asPlayerStateDataMaybePlay,
NO_DEVICE,
NO_USER,
numberFormatOptions,
PlayerStateData,
PlayerStateDataMaybePlay,
PlayPlatformId,
ProgressAwarePlayObject,
Expand Down Expand Up @@ -365,34 +363,6 @@ export const remoteHostStr = (req: Request): string => {
return `${host}${proxy !== undefined ? ` (${proxy})` : ''}${agent !== undefined ? ` (UA: ${agent})` : ''}`;
}

export const combinePartsToString = (parts: any[], glue: string = '-'): string | undefined => {
const cleanParts: string[] = [];
for (const part of parts) {
if (part === null || part === undefined) {
continue;
}
if (Array.isArray(part)) {
const nestedParts = combinePartsToString(part, glue);
if (nestedParts !== undefined) {
cleanParts.push(nestedParts);
}
} else if (typeof part === 'object') {
// hope this works
cleanParts.push(JSON.stringify(part));
} else if(typeof part === 'string') {
if(part.trim() !== '') {
cleanParts.push(part);
}
} else {
cleanParts.push(part.toString());
}
}
if (cleanParts.length > 0) {
return cleanParts.join(glue);
}
return undefined;
}

/**
* Remove duplicates based on trackId, deviceId, and play date
* */
Expand Down Expand Up @@ -759,4 +729,4 @@ export const getFirstNonEmptyVal = <T = unknown>(values: unknown[], options: {of
return undefined;
}

export const getFirstNonEmptyString = (values: unknown[]) => getFirstNonEmptyVal<string>(values, {ofType: 'string', test: (v) => v.trim() !== ''});
export const getFirstNonEmptyString = (values: unknown[]) => getFirstNonEmptyVal<string>(values, {ofType: 'string', test: (v) => v.trim() !== ''});
13 changes: 11 additions & 2 deletions src/backend/utils/StringUtils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { strategies, stringSameness, StringSamenessResult } from "@foxxmd/string-sameness";
import { PlayObject } from "../../core/Atomic.js";
import { DELIMITERS } from "../common/infrastructure/Atomic.js";
import { parseRegexSingleOrFail } from "../utils.js";
import { asPlayerStateData, DELIMITERS, PlayerStateDataMaybePlay } from "../common/infrastructure/Atomic.js";
import { genGroupIdStr, getPlatformIdFromData, parseRegexSingleOrFail } from "../utils.js";
import { buildTrackString } from "../../core/StringUtils.js";

const {levenStrategy, diceStrategy} = strategies;

Expand Down Expand Up @@ -357,3 +358,11 @@ export const firstNonEmptyStr = (vals: unknown[]): string | undefined => {
}
}
}

export const buildStatePlayerPlayIdententifyingInfo = (data: PlayObject | PlayerStateDataMaybePlay): string => {
let idInfo = genGroupIdStr(getPlatformIdFromData(data));
if(asPlayerStateData(data)) {
idInfo = buildTrackString(data.play, {include: ['artist', 'track', 'platform', 'session']});
}
return idInfo;
}
Loading

0 comments on commit 3ed6840

Please sign in to comment.