Skip to content

Timeout param maybe influence the batch consumer Message result #242

Closed
@GongZhengMe

Description

@GongZhengMe

I write a demo about Batch Message and I think maybe I find a bug about it.
plz see the two demo ,I will show how about that.
There is one consumer demo with two producer demo ,the difference of producer demo which param is timeout.

@Service
@RocketMQMessageListener(topic = "msgBatchTopic",consumerGroup ="message-batch-consumer")
public class BatchMsgConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        System.out.printf("------- MessageBatchConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
    }
}

the first producer demo ,the method syncSend without timeout param

 /**
     * RocketMQ发送消息批处理,没有timeout参数
     *
     * @author gongzheng
     * @date 2020/3/16
     */
    @RequestMapping("BatchSend")
    public void batchSend() {
        List<Message> msgs = new ArrayList<Message>();
        for (int i = 0; i < 10; i++) {
            msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
        }

        SendResult sr = rocketMQTemplate.syncSend("msgBatchTopic", msgs);

        System.out.printf("--- Batch messages send result :" + sr + "\n");
    }

the console print result

--- Batch messages send result :SendResult [sendStatus=SEND_OK, msgId=FE80000000000000431374F820C47CCC000018B4AAC27F0F62FF000A, offsetMsgId=C0A82B6900002A9F000000000001BCC3, messageQueue=MessageQueue [topic=msgBatchTopic, brokerName=localhost, queueId=0], queueOffset=2]
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F0F62FF000A, body:[{"payload":"Hello RocketMQ Batch Msg#0","headers":{"KEYS":"KEY_0","id":"a09d42c5-48fb-c0a2-59e2-1255944037b7","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#1","headers":{"KEYS":"KEY_1","id":"90430362-ecbb-32da-4276-a2badd75c3de","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#2","headers":{"KEYS":"KEY_2","id":"4d3ed9d1-2d5b-73a5-11cf-ec12ccbd7693","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#3","headers":{"KEYS":"KEY_3","id":"5eaeca40-887e-36c8-f60a-7256344b9885","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#4","headers":{"KEYS":"KEY_4","id":"284dd59e-2ca9-69f3-5d9f-306e534a6cd5","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#5","headers":{"KEYS":"KEY_5","id":"8115e99f-01fb-d2b6-ab45-8a437c6146d2","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#6","headers":{"KEYS":"KEY_6","id":"82618c85-ec5b-2e26-267f-3e1df517b1e0","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#7","headers":{"KEYS":"KEY_7","id":"a1914316-5197-dbae-34e4-f4dc22476a0d","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#8","headers":{"KEYS":"KEY_8","id":"fe90e385-d92b-54fb-c7b2-c031e2f952ac","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#9","headers":{"KEYS":"KEY_9","id":"fcca974c-74d3-c797-1750-aa2c385e73cb","timestamp":1585123714814}}] 

consumer just consume message once and the ten payloads in the message

the second producer demo ,the method syncSend with timeout param

  @RequestMapping("BatchSend")
    public void batchSend() {
        List<Message> msgs = new ArrayList<Message>();
        for (int i = 0; i < 10; i++) {
            msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
        }

        SendResult sr = rocketMQTemplate.syncSend("msgBatchTopic", msgs,60000);

        System.out.printf("--- Batch messages send result :" + sr + "\n");
    }

the console print result

--- Batch messages send result :SendResult [sendStatus=SEND_OK, msgId=FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0000,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0001,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0002,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0003,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0004,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0005,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0006,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0007,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0008,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0009, offsetMsgId=C0A82B6900002A9F000000000001C520,C0A82B6900002A9F000000000001C65F,C0A82B6900002A9F000000000001C79E,C0A82B6900002A9F000000000001C8DD,C0A82B6900002A9F000000000001CA1C,C0A82B6900002A9F000000000001CB5B,C0A82B6900002A9F000000000001CC9A,C0A82B6900002A9F000000000001CDD9,C0A82B6900002A9F000000000001CF18,C0A82B6900002A9F000000000001D057, messageQueue=MessageQueue [topic=msgBatchTopic, brokerName=localhost, queueId=1], queueOffset=12]
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0000, body:Hello RocketMQ Batch Msg#0 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0001, body:Hello RocketMQ Batch Msg#1 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0002, body:Hello RocketMQ Batch Msg#2 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0003, body:Hello RocketMQ Batch Msg#3 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0004, body:Hello RocketMQ Batch Msg#4 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0006, body:Hello RocketMQ Batch Msg#6 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0005, body:Hello RocketMQ Batch Msg#5 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0007, body:Hello RocketMQ Batch Msg#7 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0008, body:Hello RocketMQ Batch Msg#8 
------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0009, body:Hello RocketMQ Batch Msg#9 

the consumer consume ten messages.
I think the timeout param shouldn't influence the consumer consume message,so I think this is a bug.
And another thing ,I see the rocketMQ support a method for split the batch message, and I don't know how to add the method in rocketMQ-spring
this is the method:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error
   }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions