-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add message pickup module #1413
Changes from all commits
4201d39
fcb4d56
5123b37
7447f2c
0919f42
acf79db
7a634a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import type { | ||
PickupMessagesOptions, | ||
PickupMessagesReturnType, | ||
QueueMessageOptions, | ||
QueueMessageReturnType, | ||
} from './MessagePickupApiOptions' | ||
import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol' | ||
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' | ||
import type { MessageRepository } from '../../storage/MessageRepository' | ||
|
||
import { AgentContext } from '../../agent' | ||
import { MessageSender } from '../../agent/MessageSender' | ||
import { OutboundMessageContext } from '../../agent/models' | ||
import { InjectionSymbols } from '../../constants' | ||
import { AriesFrameworkError } from '../../error' | ||
import { injectable } from '../../plugins' | ||
import { ConnectionService } from '../connections/services' | ||
|
||
import { MessagePickupModuleConfig } from './MessagePickupModuleConfig' | ||
|
||
export interface MessagePickupApi<MPPs extends MessagePickupProtocol[]> { | ||
queueMessage(options: QueueMessageOptions): Promise<QueueMessageReturnType> | ||
pickupMessages(options: PickupMessagesOptions<MPPs>): Promise<PickupMessagesReturnType> | ||
} | ||
|
||
@injectable() | ||
export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessagePickupProtocol, V2MessagePickupProtocol]> | ||
implements MessagePickupApi<MPPs> | ||
{ | ||
public config: MessagePickupModuleConfig<MPPs> | ||
|
||
private messageSender: MessageSender | ||
private agentContext: AgentContext | ||
private connectionService: ConnectionService | ||
|
||
public constructor( | ||
messageSender: MessageSender, | ||
agentContext: AgentContext, | ||
connectionService: ConnectionService, | ||
config: MessagePickupModuleConfig<MPPs> | ||
) { | ||
this.messageSender = messageSender | ||
this.connectionService = connectionService | ||
this.agentContext = agentContext | ||
this.config = config | ||
} | ||
|
||
private getProtocol<MPP extends MPPs[number]['version']>(protocolVersion: MPP): MessagePickupProtocol { | ||
const protocol = this.config.protocols.find((protocol) => protocol.version === protocolVersion) | ||
|
||
if (!protocol) { | ||
throw new AriesFrameworkError(`No message pickup protocol registered for protocol version ${protocolVersion}`) | ||
} | ||
|
||
return protocol | ||
} | ||
|
||
/** | ||
* Add an encrypted message to the message pickup queue | ||
* | ||
* @param options: connectionId associated to the message and the encrypted message itself | ||
*/ | ||
public async queueMessage(options: QueueMessageOptions): Promise<QueueMessageReturnType> { | ||
const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId) | ||
|
||
const messageRepository = this.agentContext.dependencyManager.resolve<MessageRepository>( | ||
InjectionSymbols.MessageRepository | ||
) | ||
|
||
await messageRepository.add(connectionRecord.id, options.message) | ||
} | ||
|
||
/** | ||
* Pickup queued messages from a message holder. It attempts to retrieve all current messages from the | ||
* queue, receiving up to `batchSize` messages per batch retrieval. | ||
* | ||
* @param options connectionId, protocol version to use and batch size | ||
*/ | ||
public async pickupMessages(options: PickupMessagesOptions<MPPs>): Promise<PickupMessagesReturnType> { | ||
const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId) | ||
|
||
const protocol = this.getProtocol(options.protocolVersion) | ||
const { message } = await protocol.pickupMessages(this.agentContext, { | ||
connectionRecord, | ||
batchSize: options.batchSize, | ||
recipientKey: options.recipientKey, | ||
}) | ||
|
||
await this.messageSender.sendMessage( | ||
new OutboundMessageContext(message, { | ||
agentContext: this.agentContext, | ||
connection: connectionRecord, | ||
}) | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' | ||
import type { EncryptedMessage } from '../../types' | ||
|
||
/** | ||
* Get the supported protocol versions based on the provided discover features services. | ||
*/ | ||
export type MessagePickupProtocolVersionType<MPPs extends MessagePickupProtocol[]> = MPPs[number]['version'] | ||
|
||
export interface QueueMessageOptions { | ||
connectionId: string | ||
message: EncryptedMessage | ||
} | ||
|
||
export interface PickupMessagesOptions<MPPs extends MessagePickupProtocol[] = MessagePickupProtocol[]> { | ||
connectionId: string | ||
protocolVersion: MessagePickupProtocolVersionType<MPPs> | ||
recipientKey?: string | ||
batchSize?: number | ||
} | ||
|
||
export type QueueMessageReturnType = void | ||
|
||
export type PickupMessagesReturnType = void |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import type { MessagePickupModuleConfigOptions } from './MessagePickupModuleConfig' | ||
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' | ||
import type { FeatureRegistry } from '../../agent/FeatureRegistry' | ||
import type { ApiModule, DependencyManager } from '../../plugins' | ||
import type { Optional } from '../../utils' | ||
import type { Constructor } from '../../utils/mixins' | ||
|
||
import { InjectionSymbols } from '../../constants' | ||
|
||
import { MessagePickupApi } from './MessagePickupApi' | ||
import { MessagePickupModuleConfig } from './MessagePickupModuleConfig' | ||
import { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol' | ||
|
||
/** | ||
* Default protocols that will be registered if the `protocols` property is not configured. | ||
*/ | ||
export type DefaultMessagePickupProtocols = [V1MessagePickupProtocol, V2MessagePickupProtocol] | ||
|
||
// MessagePickupModuleOptions makes the protocols property optional from the config, as it will set it when not provided. | ||
export type MessagePickupModuleOptions<MessagePickupProtocols extends MessagePickupProtocol[]> = Optional< | ||
MessagePickupModuleConfigOptions<MessagePickupProtocols>, | ||
'protocols' | ||
> | ||
|
||
export class MessagePickupModule<MessagePickupProtocols extends MessagePickupProtocol[] = DefaultMessagePickupProtocols> | ||
implements ApiModule | ||
{ | ||
public readonly config: MessagePickupModuleConfig<MessagePickupProtocols> | ||
|
||
// Infer Api type from the config | ||
public readonly api: Constructor<MessagePickupApi<MessagePickupProtocols>> = MessagePickupApi | ||
|
||
public constructor(config?: MessagePickupModuleOptions<MessagePickupProtocols>) { | ||
this.config = new MessagePickupModuleConfig({ | ||
...config, | ||
protocols: config?.protocols ?? [new V1MessagePickupProtocol(), new V2MessagePickupProtocol()], | ||
}) as MessagePickupModuleConfig<MessagePickupProtocols> | ||
} | ||
|
||
/** | ||
* Registers the dependencies of the question answer module on the dependency manager. | ||
*/ | ||
public register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry) { | ||
// Api | ||
dependencyManager.registerContextScoped(MessagePickupApi) | ||
|
||
// Config | ||
dependencyManager.registerInstance(MessagePickupModuleConfig, this.config) | ||
|
||
// Message repository | ||
if (this.config.messageRepository) { | ||
dependencyManager.registerInstance(InjectionSymbols.MessageRepository, this.config.messageRepository) | ||
} | ||
|
||
// Protocol needs to register feature registry items and handlers | ||
for (const protocol of this.config.protocols) { | ||
protocol.register(dependencyManager, featureRegistry) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' | ||
import type { MessageRepository } from '../../storage/MessageRepository' | ||
|
||
/** | ||
* MessagePickupModuleConfigOptions defines the interface for the options of the MessagePickupModuleConfig class. | ||
* This can contain optional parameters that have default values in the config class itself. | ||
*/ | ||
export interface MessagePickupModuleConfigOptions<MessagePickupProtocols extends MessagePickupProtocol[]> { | ||
/** | ||
* Maximum number of messages to retrieve in a single batch message pickup | ||
* | ||
* @default 10 | ||
*/ | ||
maximumBatchSize?: number | ||
|
||
/** | ||
* Message pickup protocols to make available to the message pickup module. Only one protocol should be registered for each | ||
* protocol version. | ||
* | ||
* When not provided, V1MessagePickupProtocol and V2MessagePickupProtocol` are registered by default. | ||
* | ||
* @default | ||
* ``` | ||
* [V1MessagePickupProtocol, V2MessagePickupProtocol] | ||
* ``` | ||
*/ | ||
protocols: MessagePickupProtocols | ||
|
||
/** | ||
* Allows to specify a custom pickup message queue. It defaults to an in-memory repository | ||
* | ||
*/ | ||
messageRepository?: MessageRepository | ||
Comment on lines
+29
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious what the line is between mediator vs message pickup. Is the mediator just about routing, and is message pickup module about queuing messages? As you mentioned before, allowing messages to be queued even without mediator (so just e.g. issuer that queues messages for the holder directly) should be possible, in which case this approach makes sense. Does the message pickup module still support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to dig a little bit more about it but I initially I see "Message Pickup" mainly dedicated to the explicit pickup and setting up the pickup mode in general. Although the "live mode" can be also seen as an implicit mode, it can probably also handled by this module (maybe using a specific queue as suggested by @rodolfomiranda in hyperledger/aries-rfcs#760). The implicit mode is tricky because to me it's like a particular case that works for ACA-Py but don't see it specified in an RFC. Maybe it can still be triggered from the Mediation Recipient module (as it is being done right now) or left it to be handled from outside of the framework: for instance, we can add |
||
} | ||
|
||
export class MessagePickupModuleConfig<MessagePickupProtocols extends MessagePickupProtocol[]> { | ||
private options: MessagePickupModuleConfigOptions<MessagePickupProtocols> | ||
|
||
public constructor(options: MessagePickupModuleConfigOptions<MessagePickupProtocols>) { | ||
this.options = options | ||
} | ||
|
||
/** See {@link MessagePickupModuleConfig.maximumBatchSize} */ | ||
public get maximumBatchSize() { | ||
return this.options.maximumBatchSize ?? 10 | ||
} | ||
|
||
/** See {@link MessagePickupModuleConfig.protocols} */ | ||
public get protocols() { | ||
return this.options.protocols | ||
} | ||
|
||
/** See {@link MessagePickupModuleConfig.protocols} */ | ||
public get messageRepository() { | ||
return this.options.messageRepository | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import { FeatureRegistry } from '../../../agent/FeatureRegistry' | ||
import { Protocol } from '../../../agent/models' | ||
import { DependencyManager } from '../../../plugins/DependencyManager' | ||
import { MessagePickupApi } from '../MessagePickupApi' | ||
import { MessagePickupModule } from '../MessagePickupModule' | ||
import { MessagePickupModuleConfig } from '../MessagePickupModuleConfig' | ||
|
||
jest.mock('../../../plugins/DependencyManager') | ||
const DependencyManagerMock = DependencyManager as jest.Mock<DependencyManager> | ||
|
||
jest.mock('../../../agent/FeatureRegistry') | ||
const FeatureRegistryMock = FeatureRegistry as jest.Mock<FeatureRegistry> | ||
|
||
const dependencyManager = new DependencyManagerMock() | ||
const featureRegistry = new FeatureRegistryMock() | ||
|
||
describe('MessagePickupModule', () => { | ||
test('registers dependencies on the dependency manager', () => { | ||
const module = new MessagePickupModule() | ||
module.register(dependencyManager, featureRegistry) | ||
|
||
expect(dependencyManager.registerContextScoped).toHaveBeenCalledTimes(1) | ||
expect(dependencyManager.registerContextScoped).toHaveBeenCalledWith(MessagePickupApi) | ||
|
||
expect(dependencyManager.registerInstance).toHaveBeenCalledTimes(1) | ||
expect(dependencyManager.registerInstance).toHaveBeenCalledWith(MessagePickupModuleConfig, module.config) | ||
|
||
expect(featureRegistry.register).toHaveBeenCalledTimes(2) | ||
expect(featureRegistry.register).toHaveBeenCalledWith( | ||
new Protocol({ | ||
id: 'https://didcomm.org/messagepickup/1.0', | ||
roles: ['message_holder', 'recipient', 'batch_sender', 'batch_recipient'], | ||
}) | ||
) | ||
expect(featureRegistry.register).toHaveBeenCalledWith( | ||
new Protocol({ | ||
id: 'https://didcomm.org/messagepickup/2.0', | ||
roles: ['mediator', 'recipient'], | ||
}) | ||
) | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't actually wait for the pickup to be returned. Should we add this to function documentationion or something?
If i understand correctly, mediation recipient will call the pickup in an interval, and you still need to handle the lifecycle in the mediation recipient module? Shouldn't we move all pickup logic, to the message pickup module? So picking up messages, whether implicit, explicit, live module, on an interval is managed by this module. The mediation recipient module is just a way to request mediation, register new keys, etc... (so really about the routing part)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true: this
pickupMessages
will perform a single loop that can be called once or within an interval, and will return before the actual messages were pickup (e.g. Batch or Status/Delivery messages are received). Do you think it would be good to make it sync in order to block until the loop is completed or fail in the case of a timeout?In this PR I left the lifecycle management in mediation module mostly to avoid introducing more breaking changes, but what I would like to do in a future is
MessagePickupApi
to create pickup loops that can be initiated/queried/stopped either byMediationRecipientApi
or externally.So I would say that MediationRecipientModule will be dedicated to the things you say and also holding the configuration (the
MediatorPickupStrategy
) and callMessagePickupApi
to start/stop pickup loops as needed at the initialization/shutdown/mediator switching/etc.