Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for beanRef in KafkaListener annotation #1089

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import java.util.Map;

public interface BindingFactory<T> {
default String getChannelId(T annotation) {
return ReferenceUtil.toValidId(getChannelName(annotation));
default String getChannelId(T annotation, Class<?> component) {
return ReferenceUtil.toValidId(getChannelName(annotation, component));
}

String getChannelName(T annotation);
String getChannelName(T annotation, Class<?> component);
ruskaof marked this conversation as resolved.
Show resolved Hide resolved

Map<String, ChannelBinding> buildChannelBinding(T annotation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ private Stream<ChannelObject> mapClassToChannel(
Set<Method> methods =
annotatedMethods.stream().map(MethodAndAnnotation::method).collect(Collectors.toSet());
Map<String, Message> messages = new HashMap<>(springAnnotationMessagesService.buildMessages(
classAnnotation, methods, SpringAnnotationMessagesService.MessageType.CHANNEL));
classAnnotation, component, methods, SpringAnnotationMessagesService.MessageType.CHANNEL));

return mapClassToChannel(classAnnotation, messages);
return mapClassToChannel(classAnnotation, component, messages);
}

private Stream<ChannelObject> mapClassToChannel(ClassAnnotation classAnnotation, Map<String, Message> messages) {
return Stream.of(springAnnotationChannelService.buildChannel(classAnnotation, messages));
private Stream<ChannelObject> mapClassToChannel(
ClassAnnotation classAnnotation, Class<?> component, Map<String, Message> messages) {
return Stream.of(springAnnotationChannelService.buildChannel(classAnnotation, component, messages));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private ChannelObject mapMethodToChannel(MethodAndAnnotation<MethodAnnotation> m
MessageObject message = springAnnotationMessageService.buildMessage(annotation, payloadSchema, headerSchema);
Map<String, Message> messages = Map.of(message.getMessageId(), MessageReference.toComponentMessage(message));

return springAnnotationChannelService.buildChannel(annotation, messages);
return springAnnotationChannelService.buildChannel(
annotation, method.method().getDeclaringClass(), messages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public class SpringAnnotationChannelService<Annotation extends java.lang.annotat

private final BindingFactory<Annotation> bindingFactory;

public ChannelObject buildChannel(Annotation annotation, Map<String, Message> messages) {
public ChannelObject buildChannel(Annotation annotation, Class<?> component, Map<String, Message> messages) {
Map<String, ChannelBinding> channelBinding = bindingFactory.buildChannelBinding(annotation);
Map<String, ChannelBinding> chBinding = channelBinding != null ? new HashMap<>(channelBinding) : null;
String channelName = bindingFactory.getChannelName(annotation);
String channelName = bindingFactory.getChannelName(annotation, component);
return ChannelObject.builder()
.channelId(ReferenceUtil.toValidId(channelName))
.address(channelName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ public enum MessageType {
}

public Map<String, MessageReference> buildMessages(
ClassAnnotation classAnnotation, Set<Method> methods, MessageType messageType) {
ClassAnnotation classAnnotation, Class<?> component, Set<Method> methods, MessageType messageType) {
Set<MessageObject> messages = methods.stream()
.map(method -> buildMessage(classAnnotation, method))
.collect(toSet());

if (messageType == MessageType.OPERATION) {
String channelId = bindingFactory.getChannelName(classAnnotation);
String channelId = bindingFactory.getChannelName(classAnnotation, component);
return toOperationsMessagesMap(channelId, messages);
}
return toMessagesMap(messages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ public class SpringAnnotationOperationService<MethodAnnotation extends Annotatio
private final SpringAnnotationMessageService<MethodAnnotation> springAnnotationMessageService;

public Operation buildOperation(
MethodAnnotation annotation, PayloadSchemaObject payloadType, SchemaObject headerSchema) {
MethodAnnotation annotation,
Class<?> component,
PayloadSchemaObject payloadType,
SchemaObject headerSchema) {
MessageObject message = springAnnotationMessageService.buildMessage(annotation, payloadType, headerSchema);
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(annotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelId = bindingFactory.getChannelId(annotation);
String channelId = bindingFactory.getChannelId(annotation, component);

return Operation.builder()
.action(OperationAction.RECEIVE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ public class SpringAnnotationOperationsService<ClassAnnotation extends Annotatio
private final BindingFactory<ClassAnnotation> bindingFactory;
private final SpringAnnotationMessagesService<ClassAnnotation> springAnnotationMessagesService;

public Operation buildOperation(ClassAnnotation classAnnotation, Set<Method> methods) {
public Operation buildOperation(ClassAnnotation classAnnotation, Class<?> component, Set<Method> methods) {
var messages = springAnnotationMessagesService.buildMessages(
classAnnotation, methods, SpringAnnotationMessagesService.MessageType.OPERATION);
return buildOperation(classAnnotation, messages);
classAnnotation, component, methods, SpringAnnotationMessagesService.MessageType.OPERATION);
return buildOperation(classAnnotation, component, messages);
}

private Operation buildOperation(ClassAnnotation classAnnotation, Map<String, MessageReference> messages) {
private Operation buildOperation(
ClassAnnotation classAnnotation, Class<?> component, Map<String, MessageReference> messages) {
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(classAnnotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelName = bindingFactory.getChannelName(classAnnotation);
String channelName = bindingFactory.getChannelName(classAnnotation, component);
String channelId = ReferenceUtil.toValidId(channelName);

return Operation.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ private Stream<Map.Entry<String, Operation>> mapClassToOperation(
Class<?> component, Set<MethodAndAnnotation<MethodAnnotation>> annotatedMethods) {
ClassAnnotation classAnnotation = AnnotationUtil.findFirstAnnotationOrThrow(classAnnotationClass, component);

String channelId = bindingFactory.getChannelId(classAnnotation);
String channelId = bindingFactory.getChannelId(classAnnotation, component);
String operationId =
StringUtils.joinWith("_", channelId, OperationAction.RECEIVE.type, component.getSimpleName());

Set<Method> methods =
annotatedMethods.stream().map(MethodAndAnnotation::method).collect(Collectors.toSet());
Operation operation = springAnnotationOperationsService.buildOperation(classAnnotation, methods);
Operation operation = springAnnotationOperationsService.buildOperation(classAnnotation, component, methods);
annotatedMethods.forEach(
method -> customizers.forEach(customizer -> customizer.customize(operation, method.method())));
return Stream.of(Map.entry(operationId, operation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ public Stream<Map.Entry<String, Operation>> scan(Class<?> clazz) {
private Map.Entry<String, Operation> mapMethodToOperation(MethodAndAnnotation<MethodAnnotation> method) {
MethodAnnotation annotation = AnnotationUtil.findFirstAnnotationOrThrow(methodAnnotationClass, method.method());

String channelId = bindingFactory.getChannelId(annotation);
String channelId =
bindingFactory.getChannelId(annotation, method.method().getDeclaringClass());
String operationId = StringUtils.joinWith(
"_", channelId, OperationAction.RECEIVE.type, method.method().getName());

PayloadSchemaObject payloadSchema = payloadMethodParameterService.extractSchema(method.method());
SchemaObject headerSchema = headerClassExtractor.extractHeader(method.method(), payloadSchema);

Operation operation = springAnnotationOperationService.buildOperation(annotation, payloadSchema, headerSchema);
Operation operation = springAnnotationOperationService.buildOperation(
annotation, method.method().getDeclaringClass(), payloadSchema, headerSchema);
customizers.forEach(customizer -> customizer.customize(operation, method.method()));
return Map.entry(operationId, operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ static class TestBindingFactory implements BindingFactory<TestClassListener> {
Map.of(CHANNEL_ID, new TestBindingFactory.TestOperationBinding());

@Override
public String getChannelName(TestClassListener annotation) {
public String getChannelName(TestClassListener annotation, Class<?> component) {
return CHANNEL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void scan() {
MessageReference message = MessageReference.toComponentMessage("messageId");
Map<String, Message> messages = Map.of("messageId", message);

when(springAnnotationChannelService.buildChannel(any(), any()))
when(springAnnotationChannelService.buildChannel(any(), any(), any()))
.thenReturn(ChannelObject.builder()
.channelId(TestBindingFactory.CHANNEL_ID)
.messages(messages)
Expand All @@ -58,7 +58,7 @@ void scan() {

int methodsInClass = 2;
verify(springAnnotationMessagesService)
.buildMessages(any(), argThat(list -> list.size() == methodsInClass), any());
.buildMessages(any(), any(), argThat(list -> list.size() == methodsInClass), any());
}

@TestClassListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ static class TestBindingFactory implements BindingFactory<TestChannelListener> {
Map.of(CHANNEL_ID, new TestBindingFactory.TestOperationBinding());

@Override
public String getChannelName(TestChannelListener annotation) {
public String getChannelName(TestChannelListener annotation, Class<?> component) {
return CHANNEL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void scan() {
.build();

when(springAnnotationMessageService.buildMessage(any(), any(), any())).thenReturn(message);
when(springAnnotationChannelService.buildChannel(any(), any())).thenReturn(expectedChannelItem);
when(springAnnotationChannelService.buildChannel(any(), any(), any())).thenReturn(expectedChannelItem);

// when
List<ChannelObject> channels = scanner.scan(ClassWithTestListenerAnnotation.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SpringAnnotationOperationServiceTest {
@BeforeEach
void setUp() {
// when
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_ID);
when(bindingFactory.getChannelId(any(), any())).thenReturn(CHANNEL_ID);
doReturn(defaultOperationBinding).when(bindingFactory).buildOperationBinding(any());
}

Expand All @@ -59,8 +59,8 @@ void scan_componentHasTestListenerMethods() throws NoSuchMethodException {
.thenReturn(messageObject);

// when
Operation operations =
springAnnotationOperationService.buildOperation(annotation, payloadSchemaName, headerSchema);
Operation operations = springAnnotationOperationService.buildOperation(
annotation, ClassWithTestListenerAnnotation.class, payloadSchemaName, headerSchema);

// then
Operation expectedOperation = Operation.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ class SpringAnnotationClassLevelOperationsScannerTest {

@BeforeEach
void setUp() {
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_NAME_ID);
when(bindingFactory.getChannelId(any(), any())).thenReturn(CHANNEL_NAME_ID);
}

@Test
void scan() {
// given
Operation operation = Operation.builder().build();
when(springAnnotationOperationsService.buildOperation(any(), anySet())).thenReturn(operation);
when(springAnnotationOperationsService.buildOperation(any(), any(), anySet()))
.thenReturn(operation);

// when
List<Map.Entry<String, Operation>> operations =
Expand All @@ -64,7 +65,8 @@ void scan() {
void operationCustomizerIsCalled() {
// given
Operation operation = Operation.builder().build();
when(springAnnotationOperationsService.buildOperation(any(), anySet())).thenReturn(operation);
when(springAnnotationOperationsService.buildOperation(any(), any(), anySet()))
.thenReturn(operation);

// when
scanner.scan(ClassWithTestListenerAnnotation.class).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ class SpringAnnotationMethodLevelOperationsScannerTest {

@BeforeEach
void setUp() {
when(bindingFactory.getChannelId(any())).thenReturn(CHANNEL_ID);
when(bindingFactory.getChannelId(any(), any())).thenReturn(CHANNEL_ID);
}

@Test
void scan_componentHasTestListenerMethods() {
// given
Operation operation = Operation.builder().build();
when(springAnnotationOperationService.buildOperation(any(), any(), any()))
when(springAnnotationOperationService.buildOperation(any(), any(), any(), any()))
.thenReturn(operation);

// when
Expand All @@ -63,7 +63,7 @@ void scan_componentHasTestListenerMethods() {
@Test
void operationCustomizerIsCalled() { // given
Operation operation = Operation.builder().build();
when(springAnnotationOperationService.buildOperation(any(), any(), any()))
when(springAnnotationOperationService.buildOperation(any(), any(), any(), any()))
.thenReturn(operation);

// when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public AmqpBindingFactory(
}

@Override
public String getChannelName(RabbitListener annotation) {
public String getChannelName(RabbitListener annotation, Class<?> component) {
return RabbitListenerUtil.getChannelName(annotation, stringValueResolver);
}

@Override
public String getChannelId(RabbitListener annotation) {
public String getChannelId(RabbitListener annotation, Class<?> component) {
return RabbitListenerUtil.getChannelId(annotation, stringValueResolver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class JmsBindingFactory implements BindingFactory<JmsListener> {
private final StringValueResolver stringValueResolver;

@Override
public String getChannelName(JmsListener annotation) {
public String getChannelName(JmsListener annotation, Class<?> component) {
return JmsListenerUtil.getChannelName(annotation, stringValueResolver);
}

Expand Down
Loading