@@ -92,22 +92,25 @@ private void registerTemplate(String beanName, Object bean) {
92
92
93
93
SimpleConsumerBuilder consumerBuilder = null ;
94
94
SimpleConsumer simpleConsumer = null ;
95
+ SimpleConsumerInfo simpleConsumerInfo = null ;
96
+
95
97
try {
96
- consumerBuilder = createConsumer (annotation );
97
- simpleConsumer = consumerBuilder .build ();
98
+ final ClientServiceProvider provider = ClientServiceProvider .loadService ();
99
+ SimpleConsumerBuilder simpleConsumerBuilder = provider .newSimpleConsumerBuilder ();
100
+ simpleConsumerInfo = createConsumer (annotation , simpleConsumerBuilder );
98
101
} catch (Exception e ) {
99
102
log .error ("Failed to startup SimpleConsumer for RocketMQTemplate {}" , beanName , e );
100
103
}
101
104
RocketMQClientTemplate rocketMQTemplate = (RocketMQClientTemplate ) bean ;
102
105
rocketMQTemplate .setSimpleConsumerBuilder (consumerBuilder );
103
106
rocketMQTemplate .setSimpleConsumer (simpleConsumer );
104
107
rocketMQTemplate .setMessageConverter (rocketMQMessageConverter .getMessageConverter ());
105
- String topic = environment .resolvePlaceholders (annotation .topic ());
106
- log .info ("Set real simpleConsumer to {} using {} topic" , beanName , topic );
108
+ log .info ("Set real simpleConsumer {} to {}" , simpleConsumerInfo , beanName );
107
109
}
108
110
109
- private SimpleConsumerBuilder createConsumer (
110
- org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ) {
111
+ private SimpleConsumerInfo createConsumer (
112
+ org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ,
113
+ SimpleConsumerBuilder simpleConsumerBuilder ) {
111
114
RocketMQProperties .SimpleConsumer simpleConsumer = rocketMQProperties .getSimpleConsumer ();
112
115
String consumerGroupName = resolvePlaceholders (annotation .consumerGroup (), simpleConsumer .getConsumerGroup ());
113
116
String topicName = resolvePlaceholders (annotation .topic (), simpleConsumer .getTopic ());
@@ -122,10 +125,8 @@ private SimpleConsumerBuilder createConsumer(
122
125
Boolean sslEnabled = simpleConsumer .isSslEnabled ();
123
126
Assert .hasText (topicName , "[topic] must not be null" );
124
127
ClientConfiguration clientConfiguration = RocketMQUtil .createClientConfiguration (accessKey , secretKey , endPoints , requestTimeout , sslEnabled , namespace );
125
- final ClientServiceProvider provider = ClientServiceProvider .loadService ();
126
128
FilterExpression filterExpression = RocketMQUtil .createFilterExpression (tag , filterExpressionType );
127
129
Duration duration = Duration .ofSeconds (awaitDuration );
128
- SimpleConsumerBuilder simpleConsumerBuilder = provider .newSimpleConsumerBuilder ();
129
130
simpleConsumerBuilder .setClientConfiguration (clientConfiguration );
130
131
if (StringUtils .hasLength (consumerGroupName )) {
131
132
simpleConsumerBuilder .setConsumerGroup (consumerGroupName );
@@ -134,7 +135,8 @@ private SimpleConsumerBuilder createConsumer(
134
135
if (Objects .nonNull (filterExpression )) {
135
136
simpleConsumerBuilder .setSubscriptionExpressions (Collections .singletonMap (topicName , filterExpression ));
136
137
}
137
- return simpleConsumerBuilder ;
138
+
139
+ return new SimpleConsumerInfo (consumerGroupName , topicName , endPoints , namespace , tag , filterExpressionType , requestTimeout , awaitDuration , sslEnabled );
138
140
}
139
141
140
142
private String resolvePlaceholders (String text , String defaultValue ) {
@@ -151,4 +153,51 @@ private void validate(org.apache.rocketmq.client.annotation.ExtConsumerResetConf
151
153
annotation .value ()));
152
154
}
153
155
}
156
+
157
+ static class SimpleConsumerInfo {
158
+ String consumerGroup ;
159
+
160
+ String topicName ;
161
+
162
+ String endPoints ;
163
+
164
+ String namespace ;
165
+
166
+ String tag ;
167
+
168
+ String filterExpressionType ;
169
+
170
+ Duration requestTimeout ;
171
+
172
+ int awaitDuration ;
173
+
174
+ Boolean sslEnabled ;
175
+
176
+ public SimpleConsumerInfo (String consumerGroupName , String topicName , String endPoints , String namespace ,
177
+ String tag , String filterExpressionType , Duration requestTimeout , int awaitDuration , Boolean sslEnabled ) {
178
+ this .consumerGroup = consumerGroupName ;
179
+ this .topicName = topicName ;
180
+ this .endPoints = endPoints ;
181
+ this .namespace = namespace ;
182
+ this .tag = tag ;
183
+ this .filterExpressionType = filterExpressionType ;
184
+ this .requestTimeout = requestTimeout ;
185
+ this .awaitDuration = awaitDuration ;
186
+ this .sslEnabled = sslEnabled ;
187
+ }
188
+
189
+ @ Override public String toString () {
190
+ return "SimpleConsumerInfo{" +
191
+ "consumerGroup='" + consumerGroup + '\'' +
192
+ ", topicName='" + topicName + '\'' +
193
+ ", endPoints='" + endPoints + '\'' +
194
+ ", namespace='" + namespace + '\'' +
195
+ ", tag='" + tag + '\'' +
196
+ ", filterExpressionType='" + filterExpressionType + '\'' +
197
+ ", requestTimeout=" + requestTimeout +
198
+ ", awaitDuration=" + awaitDuration +
199
+ ", sslEnabled=" + sslEnabled +
200
+ '}' ;
201
+ }
202
+ }
154
203
}
0 commit comments