Skip to content

Commit

Permalink
disable EventSubscriber by default to save on RPCs
Browse files Browse the repository at this point in the history
  • Loading branch information
wphan committed Dec 1, 2023
1 parent da2c214 commit 5e20636
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 22 deletions.
1 change: 1 addition & 0 deletions example.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ global:
#forceDeposit: 1000

websocket: true # use websocket for account loading and events (limited support)
eventSubscriber: false # disables event subscriber (heavy RPC demand), this is primary used for counting fills
runOnce: false # Set true to run once and exit, useful for testing or one off bot runs
debug: false # Enable debug logging

Expand Down
6 changes: 2 additions & 4 deletions src/bots/filler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,7 @@ export class FillerBot implements Bot {
}

public async init() {
logger.info(`${this.name} initing`);

logger.info(`${this.name} initing userMap`);
logger.info(`${this.name} Initializing userMap...`);
const startInitUserMap = Date.now();
this.userStatsMap = new UserStatsMap(this.driftClient);
this.userMap = new UserMap(
Expand All @@ -536,7 +534,7 @@ export class FillerBot implements Bot {
await this.userStatsMap.sync(this.userMap!.getUniqueAuthorities());

logger.info(
`Initialize userMap size: ${this.userMap.size()}, userStatsMap: ${this.userStatsMap.size()}, took: ${
`Initialized userMap size: ${this.userMap.size()}, userStatsMap: ${this.userStatsMap.size()}, took: ${
Date.now() - startInitUserMap
} ms`
);
Expand Down
17 changes: 12 additions & 5 deletions src/bots/spotFiller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export class SpotFillerBot implements Bot {

private slotSubscriber: SlotSubscriber;
private driftClient: DriftClient;
private eventSubscriber: EventSubscriber;
private eventSubscriber?: EventSubscriber;
private pollingIntervalMs: number;
private driftLutAccount?: AddressLookupTableAccount;
private driftSpotLutAccount?: AddressLookupTableAccount;
Expand All @@ -159,6 +159,7 @@ export class SpotFillerBot implements Bot {
private triggeringNodes = new Map<string, number>();

private priorityFeeCalculator: PriorityFeeCalculator;
private revertOnFailure: boolean;

// metrics
private metricsInitialized = false;
Expand All @@ -184,9 +185,9 @@ export class SpotFillerBot implements Bot {
slotSubscriber: SlotSubscriber,
driftClient: DriftClient,
userMap: UserMap,
eventSubscriber: EventSubscriber,
runtimeSpec: RuntimeSpec,
config: FillerConfig
config: FillerConfig,
eventSubscriber?: EventSubscriber
) {
this.name = config.botId;
this.dryRun = config.dryRun;
Expand All @@ -212,6 +213,8 @@ export class SpotFillerBot implements Bot {
}

this.priorityFeeCalculator = new PriorityFeeCalculator(Date.now());
this.revertOnFailure = config.revertOnFailure ?? true;
logger.info(`${this.name}: revertOnFailure: ${this.revertOnFailure}`);
}

private initializeMetrics() {
Expand Down Expand Up @@ -462,7 +465,7 @@ export class SpotFillerBot implements Bot {
}
this.intervalIds = [];

this.eventSubscriber.eventEmitter.removeAllListeners('newEvent');
this.eventSubscriber?.eventEmitter.removeAllListeners('newEvent');

await this.dlobSubscriber!.unsubscribe();
await this.userStatsMap!.unsubscribe();
Expand All @@ -484,7 +487,7 @@ export class SpotFillerBot implements Bot {
);
this.intervalIds.push(intervalId);

this.eventSubscriber.eventEmitter.on(
this.eventSubscriber?.eventEmitter.on(
'newEvent',
async (record: WrappedEvent<any>) => {
await this.userMap!.updateWithEventRecord(record);
Expand Down Expand Up @@ -1023,6 +1026,10 @@ export class SpotFillerBot implements Bot {
)
);

if (this.revertOnFailure) {
ixs.push(await this.driftClient.getRevertFillIx());
}

const txStart = Date.now();
const lutAccounts: Array<AddressLookupTableAccount> = [];
this.driftLutAccount && lutAccounts.push(this.driftLutAccount);
Expand Down
3 changes: 3 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export interface GlobalConfig {
closeOpenPositions?: boolean;
forceDeposit?: number | null;
websocket?: boolean;
eventSubscriber?: false;
runOnce?: boolean;
debug?: boolean;
subaccounts?: Array<number>;
Expand Down Expand Up @@ -100,6 +101,7 @@ const defaultConfig: Partial<Config> = {
closeOpenPositions: false,
forceDeposit: null,
websocket: false,
eventSubscriber: false,
runOnce: false,
debug: false,
subaccounts: [0],
Expand Down Expand Up @@ -193,6 +195,7 @@ export function loadConfigFromOpts(opts: any): Config {
closeOpenPositions: opts.closeOpenPositions ?? false,
forceDeposit: opts.forceDeposit ?? null,
websocket: opts.websocket ?? false,
eventSubscriber: opts.eventSubscriber ?? false,
runOnce: opts.runOnce ?? false,
debug: opts.debug ?? false,
subaccounts: loadCommaDelimitToArray(opts.subaccount),
Expand Down
34 changes: 21 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ program
'--use-jito',
'Submit transactions to a Jito relayer if the bot supports it'
)
.option(
'--event-susbcriber',
'Explicitly intialize an eventSubscriber (RPC heavy'
)
.parse();

const opts = program.opts();
Expand Down Expand Up @@ -265,14 +269,17 @@ const runBot = async () => {
txSender,
});

const eventSubscriber = new EventSubscriber(connection, driftClient.program, {
maxTx: 4096,
maxEventsPerType: 4096,
orderBy: 'blockchain', // Possible options are 'blockchain' or 'client'
orderDir: 'desc',
commitment: stateCommitment,
logProviderConfig,
});
let eventSubscriber: EventSubscriber | undefined = undefined;
if (config.global.eventSubscriber) {
eventSubscriber = new EventSubscriber(connection, driftClient.program, {
maxTx: 4096,
maxEventsPerType: 4096,
orderBy: 'blockchain', // Possible options are 'blockchain' or 'client'
orderDir: 'desc',
commitment: stateCommitment,
logProviderConfig,
});
}

const slotSubscriber = new SlotSubscriber(connection, {});
const startupTime = Date.now();
Expand Down Expand Up @@ -301,9 +308,10 @@ const runBot = async () => {
await sleepMs(1000);
}
const driftUser = driftClient.getUser();
const subscribePromises = configHasBot(config, 'fillerLite')
? [driftUser.subscribe()]
: [driftUser.subscribe(), eventSubscriber.subscribe()];
const subscribePromises = [driftUser.subscribe()];
if (eventSubscriber !== undefined && !configHasBot(config, 'fillerLite')) {
subscribePromises.push(eventSubscriber.subscribe());
}
await waitForAllSubscribesToFinish(subscribePromises);

driftClient.eventEmitter.on('error', (e) => {
Expand Down Expand Up @@ -499,15 +507,15 @@ const runBot = async () => {
slotSubscriber,
driftClient,
userMap,
eventSubscriber,
{
rpcEndpoint: endpoint,
commit: commitHash,
driftEnv: config.global.driftEnv!,
driftPid: driftPublicKey.toBase58(),
walletAuthority: wallet.publicKey.toBase58(),
},
config.botConfigs!.spotFiller!
config.botConfigs!.spotFiller!,
eventSubscriber
)
);
}
Expand Down

0 comments on commit 5e20636

Please sign in to comment.