File tree Expand file tree Collapse file tree 1 file changed +21
-5
lines changed
rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source Expand file tree Collapse file tree 1 file changed +21
-5
lines changed Original file line number Diff line number Diff line change @@ -287,13 +287,17 @@ protected void setOffsetStore(DefaultMQPushConsumer consumer) {
287
287
}
288
288
MQClientInstance mQClientFactory = MQClientManager .getInstance ().getAndCreateMQClientInstance (defaultMQPushConsumer .getDefaultMQPushConsumer ());
289
289
RemoteBrokerOffsetStore offsetStore = new RemoteBrokerOffsetStore (mQClientFactory , NamespaceUtil .wrapNamespace (consumer .getNamespace (), consumer .getConsumerGroup ())) {
290
-
290
+ Set < MessageQueue > firstComing = new HashSet <>();
291
291
@ Override
292
292
public void removeOffset (MessageQueue mq ) {
293
- Set <String > splitIds = new HashSet <>();
294
- splitIds .add (new RocketMQMessageQueue (mq ).getQueueId ());
295
- removeSplit (splitIds );
296
- super .removeOffset (mq );
293
+ if (!firstComing .contains (mq )){
294
+ firstComing .add (mq );
295
+ } else {
296
+ Set <String > splitIds = new HashSet <>();
297
+ splitIds .add (new RocketMQMessageQueue (mq ).getQueueId ());
298
+ removeSplit (splitIds );
299
+ super .removeOffset (mq );
300
+ }
297
301
}
298
302
299
303
@ Override
@@ -398,4 +402,16 @@ public String getConsumerOffset() {
398
402
public void setConsumerOffset (String consumerOffset ) {
399
403
this .consumerOffset = consumerOffset ;
400
404
}
405
+
406
+ public String getStrategyName () {
407
+ return strategyName ;
408
+ }
409
+
410
+ public void setStrategyName (String strategyName ) {
411
+ this .strategyName = strategyName ;
412
+ }
413
+
414
+ public void setConsumer (DefaultMQPushConsumer consumer ) {
415
+ this .consumer = consumer ;
416
+ }
401
417
}
You can’t perform that action at this time.
0 commit comments