47
47
import java .util .Objects ;
48
48
import java .util .stream .Collectors ;
49
49
50
-
51
50
@ Configuration
52
51
public class ExtConsumerResetConfiguration implements ApplicationContextAware , SmartInitializingSingleton {
53
52
private static final Logger log = LoggerFactory .getLogger (ExtConsumerResetConfiguration .class );
@@ -61,7 +60,7 @@ public class ExtConsumerResetConfiguration implements ApplicationContextAware, S
61
60
private RocketMQMessageConverter rocketMQMessageConverter ;
62
61
63
62
public ExtConsumerResetConfiguration (RocketMQMessageConverter rocketMQMessageConverter ,
64
- ConfigurableEnvironment environment , RocketMQProperties rocketMQProperties ) {
63
+ ConfigurableEnvironment environment , RocketMQProperties rocketMQProperties ) {
65
64
this .rocketMQMessageConverter = rocketMQMessageConverter ;
66
65
this .environment = environment ;
67
66
this .rocketMQProperties = rocketMQProperties ;
@@ -75,9 +74,9 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
75
74
@ Override
76
75
public void afterSingletonsInstantiated () {
77
76
Map <String , Object > beans = this .applicationContext
78
- .getBeansWithAnnotation (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration .class )
79
- .entrySet ().stream ().filter (entry -> !ScopedProxyUtils .isScopedTarget (entry .getKey ()))
80
- .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
77
+ .getBeansWithAnnotation (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration .class )
78
+ .entrySet ().stream ().filter (entry -> !ScopedProxyUtils .isScopedTarget (entry .getKey ()))
79
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
81
80
beans .forEach (this ::registerTemplate );
82
81
}
83
82
@@ -103,10 +102,12 @@ private void registerTemplate(String beanName, Object bean) {
103
102
rocketMQTemplate .setSimpleConsumerBuilder (consumerBuilder );
104
103
rocketMQTemplate .setSimpleConsumer (simpleConsumer );
105
104
rocketMQTemplate .setMessageConverter (rocketMQMessageConverter .getMessageConverter ());
106
- log .info ("Set real simpleConsumer to :{} {}" , beanName , annotation .value ());
105
+ String topic = environment .resolvePlaceholders (annotation .topic ());
106
+ log .info ("Set real simpleConsumer to {} using {} topic" , beanName , topic );
107
107
}
108
108
109
- private SimpleConsumerBuilder createConsumer (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ) {
109
+ private SimpleConsumerBuilder createConsumer (
110
+ org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ) {
110
111
RocketMQProperties .SimpleConsumer simpleConsumer = rocketMQProperties .getSimpleConsumer ();
111
112
String consumerGroupName = resolvePlaceholders (annotation .consumerGroup (), simpleConsumer .getConsumerGroup ());
112
113
String topicName = resolvePlaceholders (annotation .topic (), simpleConsumer .getTopic ());
@@ -142,12 +143,12 @@ private String resolvePlaceholders(String text, String defaultValue) {
142
143
}
143
144
144
145
private void validate (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ,
145
- GenericApplicationContext genericApplicationContext ) {
146
+ GenericApplicationContext genericApplicationContext ) {
146
147
if (genericApplicationContext .isBeanNameInUse (annotation .value ())) {
147
148
throw new BeanDefinitionValidationException (
148
- String .format ("Bean {} has been used in Spring Application Context, " +
149
- "please check the @ExtRocketMQConsumerConfiguration" ,
150
- annotation .value ()));
149
+ String .format ("Bean {} has been used in Spring Application Context, " +
150
+ "please check the @ExtRocketMQConsumerConfiguration" ,
151
+ annotation .value ()));
151
152
}
152
153
}
153
154
}
0 commit comments