Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add message pickup module #1413

Merged
merged 7 commits into from
Apr 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/src/agent/AgentModules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { CredentialsModule } from '../modules/credentials'
import { DidsModule } from '../modules/dids'
import { DiscoverFeaturesModule } from '../modules/discover-features'
import { GenericRecordsModule } from '../modules/generic-records'
import { MessagePickupModule } from '../modules/message-pìckup'
import { OutOfBandModule } from '../modules/oob'
import { ProofsModule } from '../modules/proofs'
import { MediatorModule, MediationRecipientModule } from '../modules/routing'
Expand Down Expand Up @@ -121,6 +122,7 @@ function getDefaultAgentModules() {
proofs: () => new ProofsModule(),
mediator: () => new MediatorModule(),
mediationRecipient: () => new MediationRecipientModule(),
messagePickup: () => new MessagePickupModule(),
basicMessages: () => new BasicMessagesModule(),
genericRecords: () => new GenericRecordsModule(),
discovery: () => new DiscoverFeaturesModule(),
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/agent/BaseAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { AgentApi, CustomOrDefaultApi, EmptyModuleMap, ModulesMap, WithoutD
import type { TransportSession } from './TransportService'
import type { Logger } from '../logger'
import type { CredentialsModule } from '../modules/credentials'
import type { MessagePickupModule } from '../modules/message-pìckup'
import type { ProofsModule } from '../modules/proofs'
import type { DependencyManager } from '../plugins'

Expand All @@ -13,6 +14,7 @@ import { CredentialsApi } from '../modules/credentials'
import { DidsApi } from '../modules/dids'
import { DiscoverFeaturesApi } from '../modules/discover-features'
import { GenericRecordsApi } from '../modules/generic-records'
import { MessagePickupApi } from '../modules/message-pìckup/MessagePickupApi'
import { OutOfBandApi } from '../modules/oob'
import { ProofsApi } from '../modules/proofs'
import { MediatorApi, MediationRecipientApi } from '../modules/routing'
Expand Down Expand Up @@ -47,6 +49,7 @@ export abstract class BaseAgent<AgentModules extends ModulesMap = EmptyModuleMap
public readonly proofs: CustomOrDefaultApi<AgentModules['proofs'], ProofsModule>
public readonly mediator: MediatorApi
public readonly mediationRecipient: MediationRecipientApi
public readonly messagePickup: CustomOrDefaultApi<AgentModules['messagePickup'], MessagePickupModule>
public readonly basicMessages: BasicMessagesApi
public readonly genericRecords: GenericRecordsApi
public readonly discovery: DiscoverFeaturesApi
Expand Down Expand Up @@ -90,6 +93,10 @@ export abstract class BaseAgent<AgentModules extends ModulesMap = EmptyModuleMap
this.proofs = this.dependencyManager.resolve(ProofsApi) as CustomOrDefaultApi<AgentModules['proofs'], ProofsModule>
this.mediator = this.dependencyManager.resolve(MediatorApi)
this.mediationRecipient = this.dependencyManager.resolve(MediationRecipientApi)
this.messagePickup = this.dependencyManager.resolve(MessagePickupApi) as CustomOrDefaultApi<
AgentModules['messagePickup'],
MessagePickupModule
>
this.basicMessages = this.dependencyManager.resolve(BasicMessagesApi)
this.genericRecords = this.dependencyManager.resolve(GenericRecordsApi)
this.discovery = this.dependencyManager.resolve(DiscoverFeaturesApi)
Expand All @@ -103,6 +110,7 @@ export abstract class BaseAgent<AgentModules extends ModulesMap = EmptyModuleMap
this.proofs,
this.mediator,
this.mediationRecipient,
this.messagePickup,
this.basicMessages,
this.genericRecords,
this.discovery,
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/agent/__tests__/Agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { ConnectionService } from '../../modules/connections/services/Connection
import { TrustPingService } from '../../modules/connections/services/TrustPingService'
import { CredentialRepository } from '../../modules/credentials'
import { CredentialsApi } from '../../modules/credentials/CredentialsApi'
import { MessagePickupApi } from '../../modules/message-pìckup'
import { ProofRepository } from '../../modules/proofs'
import { ProofsApi } from '../../modules/proofs/ProofsApi'
import {
Expand Down Expand Up @@ -171,6 +172,7 @@ describe('Agent', () => {

expect(container.resolve(MediatorApi)).toBeInstanceOf(MediatorApi)
expect(container.resolve(MediationRecipientApi)).toBeInstanceOf(MediationRecipientApi)
expect(container.resolve(MessagePickupApi)).toBeInstanceOf(MessagePickupApi)
expect(container.resolve(MediationRepository)).toBeInstanceOf(MediationRepository)
expect(container.resolve(MediatorService)).toBeInstanceOf(MediatorService)
expect(container.resolve(MediationRecipientService)).toBeInstanceOf(MediationRecipientService)
Expand Down Expand Up @@ -208,6 +210,7 @@ describe('Agent', () => {

expect(container.resolve(MediatorApi)).toBe(container.resolve(MediatorApi))
expect(container.resolve(MediationRecipientApi)).toBe(container.resolve(MediationRecipientApi))
expect(container.resolve(MessagePickupApi)).toBe(container.resolve(MessagePickupApi))
expect(container.resolve(MediationRepository)).toBe(container.resolve(MediationRepository))
expect(container.resolve(MediatorService)).toBe(container.resolve(MediatorService))
expect(container.resolve(MediationRecipientService)).toBe(container.resolve(MediationRecipientService))
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/agent/__tests__/AgentModules.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { CredentialsModule } from '../../modules/credentials'
import { DidsModule } from '../../modules/dids'
import { DiscoverFeaturesModule } from '../../modules/discover-features'
import { GenericRecordsModule } from '../../modules/generic-records'
import { MessagePickupModule } from '../../modules/message-pìckup'
import { OutOfBandModule } from '../../modules/oob'
import { ProofsModule } from '../../modules/proofs'
import { MediatorModule, MediationRecipientModule } from '../../modules/routing'
Expand Down Expand Up @@ -59,6 +60,7 @@ describe('AgentModules', () => {
proofs: expect.any(ProofsModule),
mediator: expect.any(MediatorModule),
mediationRecipient: expect.any(MediationRecipientModule),
messagePickup: expect.any(MessagePickupModule),
basicMessages: expect.any(BasicMessagesModule),
genericRecords: expect.any(GenericRecordsModule),
discovery: expect.any(DiscoverFeaturesModule),
Expand All @@ -82,6 +84,7 @@ describe('AgentModules', () => {
proofs: expect.any(ProofsModule),
mediator: expect.any(MediatorModule),
mediationRecipient: expect.any(MediationRecipientModule),
messagePickup: expect.any(MessagePickupModule),
basicMessages: expect.any(BasicMessagesModule),
genericRecords: expect.any(GenericRecordsModule),
discovery: expect.any(DiscoverFeaturesModule),
Expand All @@ -108,6 +111,7 @@ describe('AgentModules', () => {
proofs: expect.any(ProofsModule),
mediator: expect.any(MediatorModule),
mediationRecipient: expect.any(MediationRecipientModule),
messagePickup: expect.any(MessagePickupModule),
basicMessages: expect.any(BasicMessagesModule),
genericRecords: expect.any(GenericRecordsModule),
discovery: expect.any(DiscoverFeaturesModule),
Expand Down
96 changes: 96 additions & 0 deletions packages/core/src/modules/message-pìckup/MessagePickupApi.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import type {
PickupMessagesOptions,
PickupMessagesReturnType,
QueueMessageOptions,
QueueMessageReturnType,
} from './MessagePickupApiOptions'
import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol'
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol'
import type { MessageRepository } from '../../storage/MessageRepository'

import { AgentContext } from '../../agent'
import { MessageSender } from '../../agent/MessageSender'
import { OutboundMessageContext } from '../../agent/models'
import { InjectionSymbols } from '../../constants'
import { AriesFrameworkError } from '../../error'
import { injectable } from '../../plugins'
import { ConnectionService } from '../connections/services'

import { MessagePickupModuleConfig } from './MessagePickupModuleConfig'

export interface MessagePickupApi<MPPs extends MessagePickupProtocol[]> {
queueMessage(options: QueueMessageOptions): Promise<QueueMessageReturnType>
pickupMessages(options: PickupMessagesOptions<MPPs>): Promise<PickupMessagesReturnType>
}

@injectable()
export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessagePickupProtocol, V2MessagePickupProtocol]>
implements MessagePickupApi<MPPs>
{
public config: MessagePickupModuleConfig<MPPs>

private messageSender: MessageSender
private agentContext: AgentContext
private connectionService: ConnectionService

public constructor(
messageSender: MessageSender,
agentContext: AgentContext,
connectionService: ConnectionService,
config: MessagePickupModuleConfig<MPPs>
) {
this.messageSender = messageSender
this.connectionService = connectionService
this.agentContext = agentContext
this.config = config
}

private getProtocol<MPP extends MPPs[number]['version']>(protocolVersion: MPP): MessagePickupProtocol {
const protocol = this.config.protocols.find((protocol) => protocol.version === protocolVersion)

if (!protocol) {
throw new AriesFrameworkError(`No message pickup protocol registered for protocol version ${protocolVersion}`)
}

return protocol
}

/**
* Add an encrypted message to the message pickup queue
*
* @param options: connectionId associated to the message and the encrypted message itself
*/
public async queueMessage(options: QueueMessageOptions): Promise<QueueMessageReturnType> {
const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId)

const messageRepository = this.agentContext.dependencyManager.resolve<MessageRepository>(
InjectionSymbols.MessageRepository
)

await messageRepository.add(connectionRecord.id, options.message)
}

/**
* Pickup queued messages from a message holder. It attempts to retrieve all current messages from the
* queue, receiving up to `batchSize` messages per batch retrieval.
*
* @param options connectionId, protocol version to use and batch size
*/
public async pickupMessages(options: PickupMessagesOptions<MPPs>): Promise<PickupMessagesReturnType> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually wait for the pickup to be returned. Should we add this to function documentationion or something?

If i understand correctly, mediation recipient will call the pickup in an interval, and you still need to handle the lifecycle in the mediation recipient module? Shouldn't we move all pickup logic, to the message pickup module? So picking up messages, whether implicit, explicit, live module, on an interval is managed by this module. The mediation recipient module is just a way to request mediation, register new keys, etc... (so really about the routing part)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true: this pickupMessages will perform a single loop that can be called once or within an interval, and will return before the actual messages were pickup (e.g. Batch or Status/Delivery messages are received). Do you think it would be good to make it sync in order to block until the loop is completed or fail in the case of a timeout?

In this PR I left the lifecycle management in mediation module mostly to avoid introducing more breaking changes, but what I would like to do in a future is MessagePickupApi to create pickup loops that can be initiated/queried/stopped either by MediationRecipientApi or externally.

So I would say that MediationRecipientModule will be dedicated to the things you say and also holding the configuration (the MediatorPickupStrategy) and call MessagePickupApi to start/stop pickup loops as needed at the initialization/shutdown/mediator switching/etc.

const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId)

const protocol = this.getProtocol(options.protocolVersion)
const { message } = await protocol.pickupMessages(this.agentContext, {
connectionRecord,
batchSize: options.batchSize,
recipientKey: options.recipientKey,
})

await this.messageSender.sendMessage(
new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection: connectionRecord,
})
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol'
import type { EncryptedMessage } from '../../types'

/**
* Get the supported protocol versions based on the provided discover features services.
*/
export type MessagePickupProtocolVersionType<MPPs extends MessagePickupProtocol[]> = MPPs[number]['version']

export interface QueueMessageOptions {
connectionId: string
message: EncryptedMessage
}

export interface PickupMessagesOptions<MPPs extends MessagePickupProtocol[] = MessagePickupProtocol[]> {
connectionId: string
protocolVersion: MessagePickupProtocolVersionType<MPPs>
recipientKey?: string
batchSize?: number
}

export type QueueMessageReturnType = void

export type PickupMessagesReturnType = void
60 changes: 60 additions & 0 deletions packages/core/src/modules/message-pìckup/MessagePickupModule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { MessagePickupModuleConfigOptions } from './MessagePickupModuleConfig'
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol'
import type { FeatureRegistry } from '../../agent/FeatureRegistry'
import type { ApiModule, DependencyManager } from '../../plugins'
import type { Optional } from '../../utils'
import type { Constructor } from '../../utils/mixins'

import { InjectionSymbols } from '../../constants'

import { MessagePickupApi } from './MessagePickupApi'
import { MessagePickupModuleConfig } from './MessagePickupModuleConfig'
import { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol'

/**
* Default protocols that will be registered if the `protocols` property is not configured.
*/
export type DefaultMessagePickupProtocols = [V1MessagePickupProtocol, V2MessagePickupProtocol]

// MessagePickupModuleOptions makes the protocols property optional from the config, as it will set it when not provided.
export type MessagePickupModuleOptions<MessagePickupProtocols extends MessagePickupProtocol[]> = Optional<
MessagePickupModuleConfigOptions<MessagePickupProtocols>,
'protocols'
>

export class MessagePickupModule<MessagePickupProtocols extends MessagePickupProtocol[] = DefaultMessagePickupProtocols>
implements ApiModule
{
public readonly config: MessagePickupModuleConfig<MessagePickupProtocols>

// Infer Api type from the config
public readonly api: Constructor<MessagePickupApi<MessagePickupProtocols>> = MessagePickupApi

public constructor(config?: MessagePickupModuleOptions<MessagePickupProtocols>) {
this.config = new MessagePickupModuleConfig({
...config,
protocols: config?.protocols ?? [new V1MessagePickupProtocol(), new V2MessagePickupProtocol()],
}) as MessagePickupModuleConfig<MessagePickupProtocols>
}

/**
* Registers the dependencies of the question answer module on the dependency manager.
*/
public register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry) {
// Api
dependencyManager.registerContextScoped(MessagePickupApi)

// Config
dependencyManager.registerInstance(MessagePickupModuleConfig, this.config)

// Message repository
if (this.config.messageRepository) {
dependencyManager.registerInstance(InjectionSymbols.MessageRepository, this.config.messageRepository)
}

// Protocol needs to register feature registry items and handlers
for (const protocol of this.config.protocols) {
protocol.register(dependencyManager, featureRegistry)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol'
import type { MessageRepository } from '../../storage/MessageRepository'

/**
* MessagePickupModuleConfigOptions defines the interface for the options of the MessagePickupModuleConfig class.
* This can contain optional parameters that have default values in the config class itself.
*/
export interface MessagePickupModuleConfigOptions<MessagePickupProtocols extends MessagePickupProtocol[]> {
/**
* Maximum number of messages to retrieve in a single batch message pickup
*
* @default 10
*/
maximumBatchSize?: number

/**
* Message pickup protocols to make available to the message pickup module. Only one protocol should be registered for each
* protocol version.
*
* When not provided, V1MessagePickupProtocol and V2MessagePickupProtocol` are registered by default.
*
* @default
* ```
* [V1MessagePickupProtocol, V2MessagePickupProtocol]
* ```
*/
protocols: MessagePickupProtocols

/**
* Allows to specify a custom pickup message queue. It defaults to an in-memory repository
*
*/
messageRepository?: MessageRepository
Comment on lines +29 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what the line is between mediator vs message pickup. Is the mediator just about routing, and is message pickup module about queuing messages? As you mentioned before, allowing messages to be queued even without mediator (so just e.g. issuer that queues messages for the holder directly) should be possible, in which case this approach makes sense.

Does the message pickup module still support implicit pickup? And if so, what's the protocolVersion in that case?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to dig a little bit more about it but I initially I see "Message Pickup" mainly dedicated to the explicit pickup and setting up the pickup mode in general. Although the "live mode" can be also seen as an implicit mode, it can probably also handled by this module (maybe using a specific queue as suggested by @rodolfomiranda in hyperledger/aries-rfcs#760).

The implicit mode is tricky because to me it's like a particular case that works for ACA-Py but don't see it specified in an RFC. Maybe it can still be triggered from the Mediation Recipient module (as it is being done right now) or left it to be handled from outside of the framework: for instance, we can add MediatorWebSocketConnected and MediatorWebSocketDisconnected events so the consumer application can manage the reconnections as needed.

}

export class MessagePickupModuleConfig<MessagePickupProtocols extends MessagePickupProtocol[]> {
private options: MessagePickupModuleConfigOptions<MessagePickupProtocols>

public constructor(options: MessagePickupModuleConfigOptions<MessagePickupProtocols>) {
this.options = options
}

/** See {@link MessagePickupModuleConfig.maximumBatchSize} */
public get maximumBatchSize() {
return this.options.maximumBatchSize ?? 10
}

/** See {@link MessagePickupModuleConfig.protocols} */
public get protocols() {
return this.options.protocols
}

/** See {@link MessagePickupModuleConfig.protocols} */
public get messageRepository() {
return this.options.messageRepository
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { FeatureRegistry } from '../../../agent/FeatureRegistry'
import { Protocol } from '../../../agent/models'
import { DependencyManager } from '../../../plugins/DependencyManager'
import { MessagePickupApi } from '../MessagePickupApi'
import { MessagePickupModule } from '../MessagePickupModule'
import { MessagePickupModuleConfig } from '../MessagePickupModuleConfig'

jest.mock('../../../plugins/DependencyManager')
const DependencyManagerMock = DependencyManager as jest.Mock<DependencyManager>

jest.mock('../../../agent/FeatureRegistry')
const FeatureRegistryMock = FeatureRegistry as jest.Mock<FeatureRegistry>

const dependencyManager = new DependencyManagerMock()
const featureRegistry = new FeatureRegistryMock()

describe('MessagePickupModule', () => {
test('registers dependencies on the dependency manager', () => {
const module = new MessagePickupModule()
module.register(dependencyManager, featureRegistry)

expect(dependencyManager.registerContextScoped).toHaveBeenCalledTimes(1)
expect(dependencyManager.registerContextScoped).toHaveBeenCalledWith(MessagePickupApi)

expect(dependencyManager.registerInstance).toHaveBeenCalledTimes(1)
expect(dependencyManager.registerInstance).toHaveBeenCalledWith(MessagePickupModuleConfig, module.config)

expect(featureRegistry.register).toHaveBeenCalledTimes(2)
expect(featureRegistry.register).toHaveBeenCalledWith(
new Protocol({
id: 'https://didcomm.org/messagepickup/1.0',
roles: ['message_holder', 'recipient', 'batch_sender', 'batch_recipient'],
})
)
expect(featureRegistry.register).toHaveBeenCalledWith(
new Protocol({
id: 'https://didcomm.org/messagepickup/2.0',
roles: ['mediator', 'recipient'],
})
)
})
})
Loading