Skip to content

Commit

Permalink
chore: move perpetual notifier watcher to vaultManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris-Hibbert committed Jan 5, 2024
1 parent 70dccf5 commit 935cbf6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 39 deletions.
40 changes: 34 additions & 6 deletions packages/inter-protocol/src/vaultFactory/vaultManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ import {
RatioShape,
} from '@agoric/ertp';
import { makeTracer } from '@agoric/internal';
import {
makeStoredNotifier,
observeNotifier,
watchPerpetualNotifier,
} from '@agoric/notifier';
import { makeStoredNotifier, observeNotifier } from '@agoric/notifier';
import { appendToStoredArray } from '@agoric/store/src/stores/store-utils.js';
import {
M,
Expand Down Expand Up @@ -73,6 +69,38 @@ const { details: X, Fail, quote: q } = assert;

const trace = makeTracer('VM');

/**
* Watch a notifier that isn't expected to fail or finish unless the vat hosting
* the notifier is upgraded. This watcher supports that by providing a
* straightforward way to get a replacement if the notifier breaks.
*
* @template T notifier topic
* @template {any[]} [A=unknown[]] arbitrary arguments
* @param {ERef<LatestTopic<T>>} notifierP
* @param {import('@agoric/swingset-liveslots').PromiseWatcher<T, A>} watcher
* @param {A} args
*/
export const watchQuoteNotifier = async (notifierP, watcher, ...args) => {
await undefined;

let updateCount;
for (;;) {
let value;
try {
({ value, updateCount } = await E(notifierP).getUpdateSince(updateCount));
watcher.onFulfilled && watcher.onFulfilled(value, ...args);
} catch (e) {
watcher.onRejected && watcher.onRejected(e, ...args);
break;
}
if (updateCount === undefined) {
watcher.onRejected &&
watcher.onRejected(Error('stream finished'), ...args);
break;
}
}
};

/** @typedef {import('./storeUtils.js').NormalizedDebt} NormalizedDebt */
/** @typedef {import('@agoric/time').RelativeTime} RelativeTime */

Expand Down Expand Up @@ -427,7 +455,7 @@ export const prepareVaultManagerKit = (
facets.helper.observeQuoteNotifier();
},
});
void watchPerpetualNotifier(quoteNotifier, quoteWatcher);
void watchQuoteNotifier(quoteNotifier, quoteWatcher);
},
/** @param {Timestamp} updateTime */
async chargeAllVaults(updateTime) {
Expand Down
32 changes: 0 additions & 32 deletions packages/notifier/src/asyncIterableAdaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,35 +103,3 @@ export const observeIteration = (asyncIterableP, iterationObserver) => {
*/
export const observeNotifier = (notifierP, iterationObserver) =>
observeIteration(subscribeLatest(notifierP), iterationObserver);

/**
* Watch an ephemeral notifier, and recover if it fails due to upgrade or other
* reasons. Since it's expected to be a perpetual notifier, the watcher's
* onRejected will also be called if the notifier calls finish().
*
* @template T notifier topic
* @template {any[]} [A=unknown[]] arbitrary arguments
* @param {ERef<LatestTopic<T>>} notifierP
* @param {import('@agoric/swingset-liveslots').PromiseWatcher<T, A>} watcher
* @param {A} args
*/
export const watchPerpetualNotifier = async (notifierP, watcher, ...args) => {
await undefined;

let updateCount;
for (;;) {
let value;
try {
({ value, updateCount } = await E(notifierP).getUpdateSince(updateCount));
watcher.onFulfilled && watcher.onFulfilled(value, ...args);
} catch (e) {
watcher.onRejected && watcher.onRejected(e, ...args);
break;
}
if (updateCount === undefined) {
watcher.onRejected &&
watcher.onRejected(Error('stream finished'), ...args);
break;
}
}
};
1 change: 0 additions & 1 deletion packages/notifier/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export {
observeNotifier,
observeIterator,
observeIteration,
watchPerpetualNotifier,
// deprecated, consider removing
makeAsyncIterableFromNotifier,
} from './asyncIterableAdaptor.js';
Expand Down

0 comments on commit 935cbf6

Please sign in to comment.