Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e034ded

Browse files
author
accelerated
committedMay 11, 2018
added test case for polling strategy refactored the strategy class
1 parent b2a2207 commit e034ded

22 files changed

+1167
-576
lines changed
 

‎include/cppkafka/consumer.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
116116
Consumer& operator=(Consumer&&) = delete;
117117

118118
/**
119-
* \brief Closes and estroys the rdkafka handle
119+
* \brief Closes and destroys the rdkafka handle
120120
*
121121
* This will call Consumer::close before destroying the handle
122122
*/

‎include/cppkafka/cppkafka.h‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
#include <cppkafka/utils/buffered_producer.h>
5757
#include <cppkafka/utils/compacted_topic_processor.h>
5858
#include <cppkafka/utils/consumer_dispatcher.h>
59-
#include <cppkafka/utils/roundrobin_poll_adapter.h>
59+
#include <cppkafka/utils/poll_interface.h>
60+
#include <cppkafka/utils/poll_strategy_base.h>
61+
#include <cppkafka/utils/roundrobin_poll_strategy.h>
6062

6163
#endif

‎include/cppkafka/kafka_handle_base.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class CPPKAFKA_API KafkaHandleBase {
7979
/**
8080
* \brief Resumes consumption/production from the given topic/partition list
8181
*
82-
* This translates into a call to rd_kafka_resume_partitions
82+
* This translates into a call to rd_kafka_resume_partitions
8383
*
8484
* \param topic_partitions The topic/partition list to resume consuming/producing from/to
8585
*/

‎include/cppkafka/utils/consumer_dispatcher.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ class CPPKAFKA_API BasicConsumerDispatcher {
238238
}
239239

240240
// Finds the first functor that accepts the parameters in a tuple and returns it. If no
241-
// such functor is found, a static asertion will occur
241+
// such functor is found, a static assertion will occur
242242
template <typename Tuple, typename... Functors>
243243
const typename find_type<Tuple, Functors...>::type&
244244
find_matching_functor(const Functors&... functors) {
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_POLL_INTERFACE_H
31+
#define CPPKAFKA_POLL_INTERFACE_H
32+
33+
#include "../consumer.h"
34+
35+
namespace cppkafka {
36+
37+
/**
38+
* \interface PollInterface
39+
*
40+
* \brief Interface defining polling methods for the Consumer class
41+
*/
42+
struct PollInterface {
43+
virtual ~PollInterface() = default;
44+
45+
/**
46+
* \brief Get the underlying consumer controlled by this strategy
47+
*
48+
* \return A reference to the consumer instance
49+
*/
50+
virtual Consumer& get_consumer() = 0;
51+
52+
/**
53+
* \brief Sets the timeout for polling functions
54+
*
55+
* This calls Consumer::set_timeout
56+
*
57+
* \param timeout The timeout to be set
58+
*/
59+
virtual void set_timeout(std::chrono::milliseconds timeout) = 0;
60+
61+
/**
62+
* \brief Gets the timeout for polling functions
63+
*
64+
* This calls Consumer::get_timeout
65+
*
66+
* \return The timeout
67+
*/
68+
virtual std::chrono::milliseconds get_timeout() = 0;
69+
70+
/**
71+
* \brief Polls all assigned partitions for new messages in round-robin fashion
72+
*
73+
* Each call to poll() will first consume from the global event queue and if there are
74+
* no pending events, will attempt to consume from all partitions until a valid message is found.
75+
* The timeout used on this call will be the one configured via RoundRobinPollStrategy::set_timeout.
76+
*
77+
* \return A message. The returned message *might* be empty. It's necessary to check
78+
* that it's a valid one before using it (see example above).
79+
*
80+
* \remark You need to call poll() or poll_batch() periodically as a keep alive mechanism,
81+
* otherwise the broker will think this consumer is down and will trigger a rebalance
82+
* (if using dynamic subscription)
83+
*/
84+
virtual Message poll() = 0;
85+
86+
/**
87+
* \brief Polls for new messages
88+
*
89+
* Same as the other overload of RoundRobinPollStrategy::poll but the provided
90+
* timeout will be used instead of the one configured on this Consumer.
91+
*
92+
* \param timeout The timeout to be used on this call
93+
*/
94+
virtual Message poll(std::chrono::milliseconds timeout) = 0;
95+
96+
/**
97+
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
98+
*
99+
* Each call to poll_batch() will first attempt to consume from the global event queue
100+
* and if the maximum batch number has not yet been filled, will attempt to fill it by
101+
* reading the remaining messages from each partition.
102+
*
103+
* \param max_batch_size The maximum amount of messages expected
104+
*
105+
* \return A list of messages
106+
*
107+
* \remark You need to call poll() or poll_batch() periodically as a keep alive mechanism,
108+
* otherwise the broker will think this consumer is down and will trigger a rebalance
109+
* (if using dynamic subscription)
110+
*/
111+
virtual MessageList poll_batch(size_t max_batch_size) = 0;
112+
113+
/**
114+
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
115+
*
116+
* Same as the other overload of RoundRobinPollStrategy::poll_batch but the provided
117+
* timeout will be used instead of the one configured on this Consumer.
118+
*
119+
* \param max_batch_size The maximum amount of messages expected
120+
*
121+
* \param timeout The timeout for this operation
122+
*
123+
* \return A list of messages
124+
*/
125+
virtual MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout) = 0;
126+
};
127+
128+
} //cppkafka
129+
130+
#endif //CPPKAFKA_POLL_INTERFACE_H
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_POLL_STRATEGY_BASE_H
31+
#define CPPKAFKA_POLL_STRATEGY_BASE_H
32+
33+
#include <map>
34+
#include <boost/any.hpp>
35+
#include "../queue.h"
36+
#include "../topic_partition_list.h"
37+
#include "poll_interface.h"
38+
39+
namespace cppkafka {
40+
41+
/**
42+
* \brief Contains a partition queue and generic metadata which can be used to store
43+
* related (user-specific) information.
44+
*/
45+
struct QueueData {
46+
Queue queue_;
47+
boost::any metadata_;
48+
};
49+
50+
/**
51+
* \class PollStrategyBase
52+
*
53+
* \brief Base implementation of the PollInterface
54+
*/
55+
class PollStrategyBase : public PollInterface
56+
{
57+
public:
58+
using QueueMap = std::map<TopicPartition, QueueData>;
59+
60+
/**
61+
* \brief Constructor
62+
*
63+
* \param consumer A reference to the polled consumer instance
64+
*/
65+
explicit PollStrategyBase(Consumer& consumer);
66+
67+
/**
68+
* \brief Destructor
69+
*/
70+
~PollStrategyBase();
71+
72+
/**
73+
* \sa PollInterface::set_timeout
74+
*/
75+
void set_timeout(std::chrono::milliseconds timeout) override;
76+
77+
/**
78+
* \sa PollInterface::get_timeout
79+
*/
80+
std::chrono::milliseconds get_timeout() override;
81+
82+
/**
83+
* \sa PollInterface::get_consumer
84+
*/
85+
Consumer& get_consumer() final;
86+
87+
protected:
88+
/**
89+
* \brief Get the queues from all assigned partitions
90+
*
91+
* \return A map of queues indexed by partition
92+
*/
93+
QueueMap& get_partition_queues();
94+
95+
/**
96+
* \brief Get the main consumer queue which services the underlying Consumer object
97+
*
98+
* \return The consumer queue
99+
*/
100+
QueueData& get_consumer_queue();
101+
102+
/**
103+
* \brief Return the next queue to be processed
104+
*
105+
* Depending on the polling strategy, each implementation must define it's own algorithm for
106+
* determining the next queue to poll.
107+
*
108+
* \return A partition queue
109+
*/
110+
virtual QueueData& get_next_queue() = 0;
111+
112+
/**
113+
* \brief Reset the internal state of the queues.
114+
*
115+
* Use this function to reset the state of any polling strategy or algorithm.
116+
*
117+
* \remark This function gets called by on_assignement(), on_revocation() and on_rebalance_error()
118+
*/
119+
virtual void reset_state();
120+
121+
/**
122+
* \brief Function to be called when a new partition assignment takes place
123+
*
124+
* This method contains a default implementation. It adds all the new queues belonging
125+
* to the provided partition list and calls reset_state().
126+
*
127+
* \param partitions Assigned topic partitions
128+
*/
129+
virtual void on_assignment(TopicPartitionList& partitions);
130+
131+
/**
132+
* \brief Function to be called when an old partition assignment gets revoked
133+
*
134+
* This method contains a default implementation. It removes all the queues
135+
* belonging to the provided partition list and calls reset_state().
136+
*
137+
* \param partitions Revoked topic partitions
138+
*/
139+
virtual void on_revocation(const TopicPartitionList& partitions);
140+
141+
/**
142+
* \brief Function to be called when a topic rebalance error happens
143+
*
144+
* This method contains a default implementation. Calls reset_state().
145+
*
146+
* \param error The rebalance error
147+
*/
148+
virtual void on_rebalance_error(Error error);
149+
150+
private:
151+
Consumer& consumer_;
152+
QueueData consumer_queue_;
153+
QueueMap partition_queues_;
154+
Consumer::AssignmentCallback assignment_callback_;
155+
Consumer::RevocationCallback revocation_callback_;
156+
Consumer::RebalanceErrorCallback rebalance_error_callback_;
157+
};
158+
159+
} //cppkafka
160+
161+
#endif //CPPKAFKA_POLL_STRATEGY_BASE_H

‎include/cppkafka/utils/roundrobin_poll_adapter.h‎

Lines changed: 0 additions & 209 deletions
This file was deleted.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H
31+
#define CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H
32+
33+
#include <map>
34+
#include <string>
35+
#include "../exceptions.h"
36+
#include "../consumer.h"
37+
#include "../queue.h"
38+
#include "poll_strategy_base.h"
39+
40+
namespace cppkafka {
41+
42+
/**
43+
* \brief This adapter changes the default polling strategy of the Consumer into a fair round-robin
44+
* polling mechanism.
45+
*
46+
* The default librdkafka (and cppkafka) poll() and poll_batch() behavior is to consume batches of
47+
* messages from each partition in turn. For performance reasons, librdkafka pre-fetches batches
48+
* of messages from the kafka broker (one batch from each partition), and stores them locally in
49+
* partition queues. Since all the internal partition queues are forwarded by default unto the
50+
* group consumer queue (one per consumer), these batches end up being polled and consumed in the
51+
* same sequence order.
52+
* This adapter allows fair round-robin polling of all assigned partitions, one message at a time
53+
* (or one batch at a time if poll_batch() is used). Note that poll_batch() has nothing to do with
54+
* the internal batching mechanism of librdkafka.
55+
*
56+
* Example code on how to use this:
57+
*
58+
* \code
59+
* // Create a consumer
60+
* Consumer consumer(...);
61+
* consumer.subscribe({ "my_topic" });
62+
*
63+
* // Optionally set the callbacks. This must be done *BEFORE* creating the adapter
64+
* consumer.set_assignment_callback(...);
65+
* consumer.set_revocation_callback(...);
66+
* consumer.set_rebalance_error_callback(...);
67+
*
68+
* // Create the adapter and use it for polling
69+
* RoundRobinPollStrategy adapter(consumer);
70+
*
71+
* while (true) {
72+
* // Poll each partition in turn
73+
* Message msg = adapter.poll();
74+
* if (msg) {
75+
* // process valid message
76+
* }
77+
* }
78+
* }
79+
* \endcode
80+
*
81+
* \warning Calling directly poll() or poll_batch() on the Consumer object while using this adapter will
82+
* lead to undesired results since the RoundRobinPollAdapter modifies the internal queuing mechanism of
83+
* the Consumer instance it owns.
84+
*/
85+
86+
class RoundRobinPollStrategy : public PollStrategyBase
87+
{
88+
public:
89+
RoundRobinPollStrategy(Consumer& consumer);
90+
91+
~RoundRobinPollStrategy();
92+
93+
/**
94+
* \sa PollInterface::poll
95+
*/
96+
Message poll() override;
97+
98+
/**
99+
* \sa PollInterface::poll
100+
*/
101+
Message poll(std::chrono::milliseconds timeout) override;
102+
103+
/**
104+
* \sa PollInterface::poll_batch
105+
*/
106+
MessageList poll_batch(size_t max_batch_size) override;
107+
108+
/**
109+
* \sa PollInterface::poll_batch
110+
*/
111+
MessageList poll_batch(size_t max_batch_size,
112+
std::chrono::milliseconds timeout) override;
113+
114+
protected:
115+
/**
116+
* \sa PollStrategyBase::get_next_queue
117+
*/
118+
QueueData& get_next_queue() final;
119+
120+
/**
121+
* \sa PollStrategyBase::reset_state
122+
*/
123+
void reset_state() final;
124+
125+
private:
126+
void consume_batch(Queue& queue,
127+
MessageList& messages,
128+
ssize_t& count,
129+
std::chrono::milliseconds timeout);
130+
131+
void restore_forwarding();
132+
133+
// Members
134+
QueueMap::iterator queue_iter_;
135+
};
136+
137+
} //cppkafka
138+
139+
#endif //CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H

‎src/CMakeLists.txt‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ set(SOURCES
1919

2020
utils/backoff_performer.cpp
2121
utils/backoff_committer.cpp
22-
utils/roundrobin_poll_adapter.cpp
22+
utils/poll_strategy_base.cpp
23+
utils/roundrobin_poll_strategy.cpp
2324
)
2425

2526
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka)

‎src/utils/poll_strategy_base.cpp‎

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#include "utils/poll_strategy_base.h"
31+
#include "consumer.h"
32+
33+
using std::chrono::milliseconds;
34+
35+
namespace cppkafka {
36+
37+
PollStrategyBase::PollStrategyBase(Consumer& consumer)
38+
: consumer_(consumer),
39+
consumer_queue_(QueueData{consumer.get_consumer_queue(), boost::any()}) {
40+
// get all currently active partition assignments
41+
TopicPartitionList assignment = consumer_.get_assignment();
42+
on_assignment(assignment);
43+
44+
// take over the assignment callback
45+
assignment_callback_ = consumer.get_assignment_callback();
46+
consumer_.set_assignment_callback([this](TopicPartitionList& partitions) {
47+
on_assignment(partitions);
48+
});
49+
// take over the revocation callback
50+
revocation_callback_ = consumer.get_revocation_callback();
51+
consumer_.set_revocation_callback([this](const TopicPartitionList& partitions) {
52+
on_revocation(partitions);
53+
});
54+
// take over the rebalance error callback
55+
rebalance_error_callback_ = consumer.get_rebalance_error_callback();
56+
consumer_.set_rebalance_error_callback([this](Error error) {
57+
on_rebalance_error(error);
58+
});
59+
}
60+
61+
PollStrategyBase::~PollStrategyBase() {
62+
//reset the original callbacks
63+
consumer_.set_assignment_callback(assignment_callback_);
64+
consumer_.set_revocation_callback(revocation_callback_);
65+
consumer_.set_rebalance_error_callback(rebalance_error_callback_);
66+
}
67+
68+
void PollStrategyBase::set_timeout(milliseconds timeout) {
69+
consumer_.set_timeout(timeout);
70+
}
71+
72+
milliseconds PollStrategyBase::get_timeout() {
73+
return consumer_.get_timeout();
74+
}
75+
76+
Consumer& PollStrategyBase::get_consumer() {
77+
return consumer_;
78+
}
79+
80+
QueueData& PollStrategyBase::get_consumer_queue() {
81+
return consumer_queue_;
82+
}
83+
84+
PollStrategyBase::QueueMap& PollStrategyBase::get_partition_queues() {
85+
return partition_queues_;
86+
}
87+
88+
void PollStrategyBase::reset_state() {
89+
90+
}
91+
92+
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
93+
// populate partition queues
94+
for (const auto& partition : partitions) {
95+
// get the queue associated with this partition
96+
partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()});
97+
}
98+
reset_state();
99+
// call original consumer callback if any
100+
if (assignment_callback_) {
101+
assignment_callback_(partitions);
102+
}
103+
}
104+
105+
void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) {
106+
for (const auto& partition : partitions) {
107+
// get the queue associated with this partition
108+
auto toppar_it = partition_queues_.find(partition);
109+
if (toppar_it != partition_queues_.end()) {
110+
// remove this queue from the list
111+
partition_queues_.erase(toppar_it);
112+
}
113+
}
114+
reset_state();
115+
// call original consumer callback if any
116+
if (revocation_callback_) {
117+
revocation_callback_(partitions);
118+
}
119+
}
120+
121+
void PollStrategyBase::on_rebalance_error(Error error) {
122+
reset_state();
123+
// call original consumer callback if any
124+
if (rebalance_error_callback_) {
125+
rebalance_error_callback_(error);
126+
}
127+
}
128+
129+
} //cppkafka

‎src/utils/roundrobin_poll_adapter.cpp‎

Lines changed: 0 additions & 186 deletions
This file was deleted.
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#include "utils/roundrobin_poll_strategy.h"
31+
32+
using std::string;
33+
using std::chrono::milliseconds;
34+
using std::make_move_iterator;
35+
36+
namespace cppkafka {
37+
38+
RoundRobinPollStrategy::RoundRobinPollStrategy(Consumer& consumer)
39+
: PollStrategyBase(consumer) {
40+
reset_state();
41+
}
42+
43+
RoundRobinPollStrategy::~RoundRobinPollStrategy() {
44+
restore_forwarding();
45+
}
46+
47+
48+
Message RoundRobinPollStrategy::poll() {
49+
return poll(get_consumer().get_timeout());
50+
}
51+
52+
Message RoundRobinPollStrategy::poll(milliseconds timeout) {
53+
// Always give priority to group and global events
54+
Message message = get_consumer_queue().queue_.consume(milliseconds(0));
55+
if (message) {
56+
return message;
57+
}
58+
size_t num_queues = get_partition_queues().size();
59+
while (num_queues--) {
60+
//consume the next partition (non-blocking)
61+
message = get_next_queue().queue_.consume(milliseconds(0));
62+
if (message) {
63+
return message;
64+
}
65+
}
66+
// We still don't have a valid message so we block on the event queue
67+
return get_consumer_queue().queue_.consume(timeout);
68+
}
69+
70+
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) {
71+
return poll_batch(max_batch_size, get_consumer().get_timeout());
72+
}
73+
74+
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, milliseconds timeout) {
75+
MessageList messages;
76+
ssize_t count = max_batch_size;
77+
78+
// batch from the group event queue first (non-blocking)
79+
consume_batch(get_consumer_queue().queue_, messages, count, milliseconds(0));
80+
size_t num_queues = get_partition_queues().size();
81+
while ((count > 0) && (num_queues--)) {
82+
// batch from the next partition (non-blocking)
83+
consume_batch(get_next_queue().queue_, messages, count, milliseconds(0));
84+
}
85+
// we still have space left in the buffer
86+
if (count > 0) {
87+
// wait on the event queue until timeout
88+
consume_batch(get_consumer_queue().queue_, messages, count, timeout);
89+
}
90+
return messages;
91+
}
92+
93+
void RoundRobinPollStrategy::consume_batch(Queue& queue,
94+
MessageList& messages,
95+
ssize_t& count,
96+
milliseconds timeout)
97+
{
98+
MessageList queue_messages = queue.consume_batch(count, timeout);
99+
if (queue_messages.empty()) {
100+
return;
101+
}
102+
// concatenate both lists
103+
messages.insert(messages.end(),
104+
make_move_iterator(queue_messages.begin()),
105+
make_move_iterator(queue_messages.end()));
106+
// reduce total batch count
107+
count -= queue_messages.size();
108+
}
109+
110+
111+
void RoundRobinPollStrategy::restore_forwarding() {
112+
// forward all partition queues
113+
for (const auto& toppar : get_partition_queues()) {
114+
toppar.second.queue_.forward_to_queue(get_consumer_queue().queue_);
115+
}
116+
}
117+
118+
QueueData& RoundRobinPollStrategy::get_next_queue() {
119+
if (get_partition_queues().empty()) {
120+
throw QueueException(RD_KAFKA_RESP_ERR__STATE);
121+
}
122+
if (++queue_iter_ == get_partition_queues().end()) {
123+
queue_iter_ = get_partition_queues().begin();
124+
}
125+
return queue_iter_->second;
126+
}
127+
128+
void RoundRobinPollStrategy::reset_state() {
129+
queue_iter_ = get_partition_queues().begin();
130+
}
131+
132+
} //cppkafka

‎tests/CMakeLists.txt‎

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,22 @@ set(KAFKA_TEST_INSTANCE "kafka-vm:9092"
77
add_custom_target(tests)
88

99
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
10-
add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp)
11-
target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} pthread)
12-
1310
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
1411

1512
add_executable(
1613
cppkafka_tests
17-
EXCLUDE_FROM_ALL
1814
buffer_test.cpp
1915
compacted_topic_processor_test.cpp
2016
configuration_test.cpp
2117
topic_partition_list_test.cpp
2218
kafka_handle_base_test.cpp
2319
producer_test.cpp
2420
consumer_test.cpp
21+
roundrobin_poll_test.cpp
2522

2623
# Main file
2724
test_main.cpp
2825
)
29-
target_link_libraries(cppkafka_tests cppkafka-test)
26+
target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread)
3027
add_dependencies(tests cppkafka_tests)
3128
add_test(cppkafka cppkafka_tests)

‎tests/compacted_topic_processor_test.cpp‎

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "cppkafka/producer.h"
99
#include "cppkafka/consumer.h"
1010
#include "cppkafka/utils/compacted_topic_processor.h"
11+
#include "test_utils.h"
1112

1213
using std::string;
1314
using std::to_string;
@@ -29,8 +30,6 @@ using std::chrono::milliseconds;
2930

3031
using namespace cppkafka;
3132

32-
static const string KAFKA_TOPIC = "cppkafka_test1";
33-
3433
static Configuration make_producer_config() {
3534
Configuration config;
3635
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
@@ -65,7 +64,7 @@ TEST_CASE("consumption", "[consumer][compacted]") {
6564
compacted_consumer.set_event_handler([&](const Event& event) {
6665
events.push_back(event);
6766
});
68-
consumer.subscribe({ KAFKA_TOPIC });
67+
consumer.subscribe({ KAFKA_TOPICS[0] });
6968
consumer.poll();
7069
consumer.poll();
7170
consumer.poll();
@@ -82,13 +81,13 @@ TEST_CASE("consumption", "[consumer][compacted]") {
8281
};
8382
for (const auto& element_pair : elements) {
8483
const ElementType& element = element_pair.second;
85-
MessageBuilder builder(KAFKA_TOPIC);
84+
MessageBuilder builder(KAFKA_TOPICS[0]);
8685
builder.partition(element.partition).key(element_pair.first).payload(element.value);
8786
producer.produce(builder);
8887
}
8988
// Now erase the first element
9089
string deleted_key = "42";
91-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(0).key(deleted_key));
90+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(0).key(deleted_key));
9291

9392
for (size_t i = 0; i < 10; ++i) {
9493
compacted_consumer.process_event();

‎tests/consumer_test.cpp‎

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ using std::chrono::system_clock;
2929

3030
using namespace cppkafka;
3131

32-
const string KAFKA_TOPIC = "cppkafka_test1";
33-
3432
static Configuration make_producer_config() {
3533
Configuration config;
3634
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
@@ -54,31 +52,32 @@ TEST_CASE("message consumption", "[consumer]") {
5452
consumer.set_assignment_callback([&](const TopicPartitionList& topic_partitions) {
5553
assignment = topic_partitions;
5654
});
57-
consumer.subscribe({ KAFKA_TOPIC });
58-
ConsumerRunner runner(consumer, 1, 3);
55+
consumer.subscribe({ KAFKA_TOPICS[0] });
56+
ConsumerRunner runner(consumer, 1, KAFKA_NUM_PARTITIONS);
5957

6058
// Produce a message just so we stop the consumer
6159
Producer producer(make_producer_config());
6260
string payload = "Hello world!";
63-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
61+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
6462
runner.try_join();
6563

66-
// All 3 partitions should be ours
67-
REQUIRE(assignment.size() == 3);
68-
set<int> partitions = { 0, 1, 2 };
64+
// All partitions should be ours
65+
REQUIRE(assignment.size() == KAFKA_NUM_PARTITIONS);
66+
set<int> partitions;
67+
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace(i++));
6968
for (const auto& topic_partition : assignment) {
70-
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
69+
CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]);
7170
CHECK(partitions.erase(topic_partition.get_partition()) == true);
7271
}
7372
REQUIRE(runner.get_messages().size() == 1);
74-
CHECK(consumer.get_subscription() == vector<string>{ KAFKA_TOPIC });
73+
CHECK(consumer.get_subscription() == vector<string>{ KAFKA_TOPICS[0] });
7574

7675
assignment = consumer.get_assignment();
77-
CHECK(assignment.size() == 3);
76+
CHECK(assignment.size() == KAFKA_NUM_PARTITIONS);
7877

7978
int64_t low;
8079
int64_t high;
81-
tie(low, high) = consumer.get_offsets({ KAFKA_TOPIC, partition });
80+
tie(low, high) = consumer.get_offsets({ KAFKA_TOPICS[0], partition });
8281
CHECK(high > low);
8382
CHECK(runner.get_messages().back().get_offset() + 1 == high);
8483
}
@@ -97,35 +96,36 @@ TEST_CASE("consumer rebalance", "[consumer]") {
9796
consumer1.set_revocation_callback([&](const TopicPartitionList&) {
9897
revocation_called = true;
9998
});
100-
consumer1.subscribe({ KAFKA_TOPIC });
101-
ConsumerRunner runner1(consumer1, 1, 3);
99+
consumer1.subscribe({ KAFKA_TOPICS[0] });
100+
ConsumerRunner runner1(consumer1, 1, KAFKA_NUM_PARTITIONS);
102101

103102
// Create a second consumer and subscribe to the topic
104103
Consumer consumer2(make_consumer_config());
105104
consumer2.set_assignment_callback([&](const TopicPartitionList& topic_partitions) {
106105
assignment2 = topic_partitions;
107106
});
108-
consumer2.subscribe({ KAFKA_TOPIC });
107+
consumer2.subscribe({ KAFKA_TOPICS[0] });
109108
ConsumerRunner runner2(consumer2, 1, 1);
110109

111110
CHECK(revocation_called == true);
112111

113112
// Produce a message just so we stop the consumer
114113
Producer producer(make_producer_config());
115114
string payload = "Hello world!";
116-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
115+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
117116
runner1.try_join();
118117
runner2.try_join();
119118

120-
// All 3 partitions should be assigned
121-
CHECK(assignment1.size() + assignment2.size() == 3);
122-
set<int> partitions = { 0, 1, 2 };
119+
// All partitions should be assigned
120+
CHECK(assignment1.size() + assignment2.size() == KAFKA_NUM_PARTITIONS);
121+
set<int> partitions;
122+
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace(i++));
123123
for (const auto& topic_partition : assignment1) {
124-
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
124+
CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]);
125125
CHECK(partitions.erase(topic_partition.get_partition()) == true);
126126
}
127127
for (const auto& topic_partition : assignment2) {
128-
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
128+
CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]);
129129
CHECK(partitions.erase(topic_partition.get_partition()) == true);
130130
}
131131
CHECK(runner1.get_messages().size() + runner2.get_messages().size() == 1);
@@ -143,18 +143,18 @@ TEST_CASE("consumer offset commit", "[consumer]") {
143143
offset_commit_called = true;
144144
CHECK(!!error == false);
145145
REQUIRE(topic_partitions.size() == 1);
146-
CHECK(topic_partitions[0].get_topic() == KAFKA_TOPIC);
146+
CHECK(topic_partitions[0].get_topic() == KAFKA_TOPICS[0]);
147147
CHECK(topic_partitions[0].get_partition() == 0);
148148
CHECK(topic_partitions[0].get_offset() == message_offset + 1);
149149
});
150150
Consumer consumer(config);
151-
consumer.assign({ { KAFKA_TOPIC, 0 } });
151+
consumer.assign({ { KAFKA_TOPICS[0], 0 } });
152152
ConsumerRunner runner(consumer, 1, 1);
153153

154154
// Produce a message just so we stop the consumer
155155
Producer producer(make_producer_config());
156156
string payload = "Hello world!";
157-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
157+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
158158
runner.try_join();
159159

160160
REQUIRE(runner.get_messages().size() == 1);
@@ -173,7 +173,7 @@ TEST_CASE("consumer throttle", "[consumer]") {
173173
// Create a consumer and subscribe to the topic
174174
Configuration config = make_consumer_config("offset_commit");
175175
Consumer consumer(config);
176-
consumer.assign({ { KAFKA_TOPIC, 0 } });
176+
consumer.assign({ { KAFKA_TOPICS[0], 0 } });
177177

178178
{
179179
ConsumerRunner runner(consumer, 0, 1);
@@ -183,7 +183,7 @@ TEST_CASE("consumer throttle", "[consumer]") {
183183
// Produce a message just so we stop the consumer
184184
BufferedProducer<string> producer(make_producer_config());
185185
string payload = "Hello world!";
186-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
186+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
187187
producer.flush();
188188

189189
size_t callback_executed_count = 0;
@@ -213,7 +213,7 @@ TEST_CASE("consume batch", "[consumer]") {
213213
// Create a consumer and subscribe to the topic
214214
Configuration config = make_consumer_config("test");
215215
Consumer consumer(config);
216-
consumer.assign({ { KAFKA_TOPIC, 0 } });
216+
consumer.assign({ { KAFKA_TOPICS[0], 0 } });
217217

218218
{
219219
ConsumerRunner runner(consumer, 0, 1);
@@ -224,8 +224,8 @@ TEST_CASE("consume batch", "[consumer]") {
224224
BufferedProducer<string> producer(make_producer_config());
225225
string payload = "Hello world!";
226226
// Produce it twice
227-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
228-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
227+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
228+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
229229
producer.flush();
230230

231231
MessageList all_messages;

‎tests/kafka_handle_base_test.cpp‎

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ using std::string;
1414

1515
using namespace cppkafka;
1616

17-
static const string KAFKA_TOPIC = "cppkafka_test1";
18-
1917
Configuration make_config() {
2018
Configuration config;
2119
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
@@ -45,6 +43,9 @@ uint16_t get_kafka_port() {
4543
}
4644

4745
TEST_CASE("metadata", "[handle_base]") {
46+
if (KAFKA_TOPICS.size() < 2) {
47+
return; //skip test
48+
}
4849
Producer producer({});
4950
producer.add_brokers(KAFKA_TEST_INSTANCE);
5051
Metadata metadata = producer.get_metadata();
@@ -59,7 +60,7 @@ TEST_CASE("metadata", "[handle_base]") {
5960
}
6061

6162
SECTION("topics") {
62-
unordered_set<string> topic_names = { "cppkafka_test1", "cppkafka_test2" };
63+
unordered_set<string> topic_names = { KAFKA_TOPICS[0], KAFKA_TOPICS[1] };
6364
size_t found_topics = 0;
6465

6566
const vector<TopicMetadata>& topics = metadata.get_topics();
@@ -68,8 +69,9 @@ TEST_CASE("metadata", "[handle_base]") {
6869
for (const auto& topic : topics) {
6970
if (topic_names.count(topic.get_name()) == 1) {
7071
const vector<PartitionMetadata>& partitions = topic.get_partitions();
71-
REQUIRE(partitions.size() == 3);
72-
set<int32_t> expected_ids = { 0, 1, 2 };
72+
REQUIRE(partitions.size() == KAFKA_NUM_PARTITIONS);
73+
set<int32_t> expected_ids;
74+
for (int i = 0; i < KAFKA_NUM_PARTITIONS; expected_ids.emplace(i++));
7375
for (const PartitionMetadata& partition : partitions) {
7476
REQUIRE(expected_ids.erase(partition.get_id()) == 1);
7577
for (int32_t replica : partition.get_replicas()) {
@@ -90,8 +92,8 @@ TEST_CASE("metadata", "[handle_base]") {
9092
CHECK(metadata.get_topics_prefixed("cppkafka_").size() == topic_names.size());
9193

9294
// Now get the whole metadata only for this topic
93-
Topic topic = producer.get_topic(KAFKA_TOPIC);
94-
CHECK(producer.get_metadata(topic).get_name() == KAFKA_TOPIC);
95+
Topic topic = producer.get_topic(KAFKA_TOPICS[0]);
96+
CHECK(producer.get_metadata(topic).get_name() == KAFKA_TOPICS[0]);
9597
}
9698
}
9799

@@ -106,7 +108,7 @@ TEST_CASE("consumer groups", "[handle_base]") {
106108

107109
// Build consumer
108110
Consumer consumer(config);
109-
consumer.subscribe({ KAFKA_TOPIC });
111+
consumer.subscribe({ KAFKA_TOPICS[0] });
110112
ConsumerRunner runner(consumer, 0, 3);
111113
runner.try_join();
112114

@@ -120,11 +122,8 @@ TEST_CASE("consumer groups", "[handle_base]") {
120122

121123
MemberAssignmentInformation assignment = member.get_member_assignment();
122124
CHECK(assignment.get_version() == 0);
123-
TopicPartitionList expected_topic_partitions = {
124-
{ KAFKA_TOPIC, 0 },
125-
{ KAFKA_TOPIC, 1 },
126-
{ KAFKA_TOPIC, 2 }
127-
};
125+
TopicPartitionList expected_topic_partitions;
126+
for (int i = 0; i < KAFKA_NUM_PARTITIONS; expected_topic_partitions.emplace_back(KAFKA_TOPICS[0], i++));
128127
TopicPartitionList topic_partitions = assignment.get_topic_partitions();
129128
sort(topic_partitions.begin(), topic_partitions.end());
130129
CHECK(topic_partitions == expected_topic_partitions);

‎tests/producer_test.cpp‎

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ using std::chrono::milliseconds;
2626

2727
using namespace cppkafka;
2828

29-
static const string KAFKA_TOPIC = "cppkafka_test1";
30-
3129
static Configuration make_producer_config() {
3230
Configuration config = {
3331
{ "metadata.broker.list", KAFKA_TEST_INSTANCE },
@@ -53,29 +51,29 @@ TEST_CASE("simple production", "[producer]") {
5351

5452
// Create a consumer and assign this topic/partition
5553
Consumer consumer(make_consumer_config());
56-
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
54+
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
5755
ConsumerRunner runner(consumer, 1, 1);
5856

5957
Configuration config = make_producer_config();
6058
SECTION("message with no key") {
6159
// Now create a producer and produce a message
6260
const string payload = "Hello world! 1";
6361
Producer producer(config);
64-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
62+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
6563
runner.try_join();
6664

6765
const auto& messages = runner.get_messages();
6866
REQUIRE(messages.size() == 1);
6967
const auto& message = messages[0];
7068
CHECK(message.get_payload() == payload);
7169
CHECK(!!message.get_key() == false);
72-
CHECK(message.get_topic() == KAFKA_TOPIC);
70+
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
7371
CHECK(message.get_partition() == partition);
7472
CHECK(!!message.get_error() == false);
7573

7674
int64_t low;
7775
int64_t high;
78-
tie(low, high) = producer.query_offsets({ KAFKA_TOPIC, partition });
76+
tie(low, high) = producer.query_offsets({ KAFKA_TOPICS[0], partition });
7977
CHECK(high > low);
8078
}
8179

@@ -84,7 +82,7 @@ TEST_CASE("simple production", "[producer]") {
8482
const string key = "such key";
8583
const milliseconds timestamp{15};
8684
Producer producer(config);
87-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition)
85+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
8886
.key(key)
8987
.payload(payload)
9088
.timestamp(timestamp));
@@ -95,7 +93,7 @@ TEST_CASE("simple production", "[producer]") {
9593
const auto& message = messages[0];
9694
CHECK(message.get_payload() == payload);
9795
CHECK(message.get_key() == key);
98-
CHECK(message.get_topic() == KAFKA_TOPIC);
96+
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
9997
CHECK(message.get_partition() == partition);
10098
CHECK(!!message.get_error() == false);
10199
REQUIRE(!!message.get_timestamp() == true);
@@ -116,14 +114,14 @@ TEST_CASE("simple production", "[producer]") {
116114
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
117115
int32_t partition_count) {
118116
CHECK(msg_key == key);
119-
CHECK(partition_count == 3);
120-
CHECK(topic.get_name() == KAFKA_TOPIC);
117+
CHECK(partition_count == KAFKA_NUM_PARTITIONS);
118+
CHECK(topic.get_name() == KAFKA_TOPICS[0]);
121119
return 0;
122120
});
123121
config.set_default_topic_configuration(topic_config);
124122

125123
Producer producer(config);
126-
producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload));
124+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).key(key).payload(payload));
127125
while (producer.get_out_queue_length() > 0) {
128126
producer.poll();
129127
}
@@ -134,7 +132,7 @@ TEST_CASE("simple production", "[producer]") {
134132
const auto& message = messages[0];
135133
CHECK(message.get_payload() == payload);
136134
CHECK(message.get_key() == key);
137-
CHECK(message.get_topic() == KAFKA_TOPIC);
135+
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
138136
CHECK(message.get_partition() == partition);
139137
CHECK(!!message.get_error() == false);
140138
CHECK(delivery_report_called == true);
@@ -150,15 +148,15 @@ TEST_CASE("simple production", "[producer]") {
150148
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
151149
int32_t partition_count) {
152150
CHECK(msg_key == key);
153-
CHECK(partition_count == 3);
154-
CHECK(topic.get_name() == KAFKA_TOPIC);
151+
CHECK(partition_count == KAFKA_NUM_PARTITIONS);
152+
CHECK(topic.get_name() == KAFKA_TOPICS[0]);
155153
callback_called = true;
156154
return 0;
157155
});
158156
config.set_default_topic_configuration(topic_config);
159157
Producer producer(config);
160158

161-
producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload));
159+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).key(key).payload(payload));
162160
producer.poll();
163161
runner.try_join();
164162

@@ -172,33 +170,32 @@ TEST_CASE("simple production", "[producer]") {
172170

173171
TEST_CASE("multiple messages", "[producer]") {
174172
size_t message_count = 10;
175-
int partitions = 3;
176173
set<string> payloads;
177174

178175
// Create a consumer and subscribe to this topic
179176
Consumer consumer(make_consumer_config());
180-
consumer.subscribe({ KAFKA_TOPIC });
181-
ConsumerRunner runner(consumer, message_count, partitions);
177+
consumer.subscribe({ KAFKA_TOPICS[0] });
178+
ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS);
182179

183180
// Now create a producer and produce a message
184181
Producer producer(make_producer_config());
185182
const string payload_base = "Hello world ";
186183
for (size_t i = 0; i < message_count; ++i) {
187184
const string payload = payload_base + to_string(i);
188185
payloads.insert(payload);
189-
producer.produce(MessageBuilder(KAFKA_TOPIC).payload(payload));
186+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload));
190187
}
191188
runner.try_join();
192189

193190
const auto& messages = runner.get_messages();
194191
REQUIRE(messages.size() == message_count);
195192
for (const auto& message : messages) {
196-
CHECK(message.get_topic() == KAFKA_TOPIC);
193+
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
197194
CHECK(payloads.erase(message.get_payload()) == 1);
198195
CHECK(!!message.get_error() == false);
199196
CHECK(!!message.get_key() == false);
200197
CHECK(message.get_partition() >= 0);
201-
CHECK(message.get_partition() < 3);
198+
CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS);
202199
}
203200
}
204201

@@ -207,30 +204,30 @@ TEST_CASE("buffered producer", "[producer]") {
207204

208205
// Create a consumer and assign this topic/partition
209206
Consumer consumer(make_consumer_config());
210-
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
207+
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
211208
ConsumerRunner runner(consumer, 3, 1);
212209

213210
// Now create a buffered producer and produce two messages
214211
BufferedProducer<string> producer(make_producer_config());
215212
const string payload = "Hello world! 2";
216213
const string key = "such key";
217-
producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition)
214+
producer.add_message(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
218215
.key(key)
219216
.payload(payload));
220-
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
217+
producer.add_message(producer.make_builder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
221218
producer.flush();
222-
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
219+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
223220
producer.wait_for_acks();
224221
// Add another one but then clear it
225-
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
222+
producer.add_message(producer.make_builder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
226223
producer.clear();
227224
runner.try_join();
228225

229226
const auto& messages = runner.get_messages();
230227
REQUIRE(messages.size() == 3);
231228
const auto& message = messages[0];
232229
CHECK(message.get_key() == key);
233-
CHECK(message.get_topic() == KAFKA_TOPIC);
230+
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
234231
CHECK(message.get_partition() == partition);
235232
CHECK(!!message.get_error() == false);
236233

‎tests/roundrobin_poll_test.cpp‎

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
#include <vector>
2+
#include <thread>
3+
#include <set>
4+
#include <mutex>
5+
#include <chrono>
6+
#include <iterator>
7+
#include <condition_variable>
8+
#include <catch.hpp>
9+
#include <memory>
10+
#include <iostream>
11+
#include "cppkafka/cppkafka.h"
12+
#include "test_utils.h"
13+
14+
using std::vector;
15+
using std::move;
16+
using std::string;
17+
using std::thread;
18+
using std::set;
19+
using std::mutex;
20+
using std::tie;
21+
using std::condition_variable;
22+
using std::lock_guard;
23+
using std::unique_lock;
24+
using std::make_move_iterator;
25+
using std::chrono::seconds;
26+
using std::chrono::milliseconds;
27+
using std::chrono::system_clock;
28+
29+
using namespace cppkafka;
30+
31+
//========================================================================
32+
// TESTS
33+
//========================================================================
34+
35+
static Configuration make_producer_config() {
36+
Configuration config;
37+
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
38+
return config;
39+
}
40+
41+
static Configuration make_consumer_config(const string& group_id = "rr_consumer_test") {
42+
Configuration config;
43+
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
44+
config.set("enable.auto.commit", true);
45+
config.set("enable.auto.offset.store", true );
46+
config.set("auto.commit.interval.ms", 100);
47+
config.set("group.id", group_id);
48+
return config;
49+
}
50+
51+
TEST_CASE("serial consumer test", "[roundrobin consumer]") {
52+
int messages_per_partition = 3;
53+
int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition;
54+
55+
// Create a consumer and subscribe to the topic
56+
Consumer consumer(make_consumer_config());
57+
TopicPartitionList partitions;
58+
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++));
59+
consumer.assign(partitions);
60+
61+
// Start the runner with the original consumer
62+
ConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
63+
64+
// Produce messages so we stop the consumer
65+
Producer producer(make_producer_config());
66+
string payload = "Serial";
67+
68+
// push 3 messages in each partition
69+
for (int i = 0; i < total_messages; ++i) {
70+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
71+
}
72+
producer.flush();
73+
runner.try_join();
74+
75+
// Check that we have all messages
76+
REQUIRE(runner.get_messages().size() == total_messages);
77+
78+
// messages should have sequential identical partition ids in groups of <messages_per_partition>
79+
int expected_partition;
80+
for (int i = 0; i < total_messages; ++i) {
81+
if ((i % messages_per_partition) == 0) {
82+
expected_partition = runner.get_messages()[i].get_partition();
83+
}
84+
REQUIRE(runner.get_messages()[i].get_partition() == expected_partition);
85+
REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
86+
}
87+
}
88+
89+
TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
90+
TopicPartitionList assignment;
91+
int messages_per_partition = 3;
92+
int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition;
93+
94+
// Create a consumer and subscribe to the topic
95+
RoundRobinConsumer consumer(make_consumer_config());
96+
TopicPartitionList partitions;
97+
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++));
98+
consumer.assign(partitions);
99+
consumer.create_polling_strategy();
100+
101+
RRConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
102+
103+
// Produce messages so we stop the consumer
104+
Producer producer(make_producer_config());
105+
string payload = "RoundRobin";
106+
107+
// push 3 messages in each partition
108+
for (int i = 0; i < total_messages; ++i) {
109+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
110+
}
111+
producer.flush();
112+
runner.try_join();
113+
114+
// Check that we have all messages
115+
REQUIRE(runner.get_messages().size() == total_messages);
116+
117+
// Check that we have one message from each partition in desired order
118+
vector<int> partition_order = make_roundrobin_partition_vector(total_messages);
119+
120+
for (int i = 0; i < total_messages; ++i) {
121+
REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+1]);
122+
REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
123+
}
124+
125+
//============ resume original poll strategy =============//
126+
127+
//validate that once the round robin strategy is deleted, normal poll works as before
128+
consumer.delete_polling_strategy();
129+
130+
ConsumerRunner serial_runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
131+
132+
payload = "SerialPolling";
133+
// push 3 messages in each partition
134+
for (int i = 0; i < total_messages; ++i) {
135+
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
136+
}
137+
producer.flush();
138+
serial_runner.try_join();
139+
140+
// Check that we have all messages
141+
REQUIRE(serial_runner.get_messages().size() == total_messages);
142+
143+
// Check that we have one message from each partition in desired order
144+
int expected_partition;
145+
for (int i = 0; i < total_messages; ++i) {
146+
if ((i % messages_per_partition) == 0) {
147+
expected_partition = serial_runner.get_messages()[i].get_partition();
148+
}
149+
REQUIRE(serial_runner.get_messages()[i].get_partition() == expected_partition);
150+
REQUIRE((string)serial_runner.get_messages()[i].get_payload() == payload);
151+
}
152+
}
153+

‎tests/test_main.cpp‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ using Catch::TestCaseStats;
1515
using Catch::Totals;
1616
using Catch::Session;
1717

18+
std::vector<std::string> KAFKA_TOPICS = {"cppkafka_test1", "cppkafka_test2"};
19+
int KAFKA_NUM_PARTITIONS = 3;
20+
1821
namespace cppkafka {
1922

2023
class InstantTestReporter : public ConsoleReporter {

‎tests/test_utils.cpp‎

Lines changed: 0 additions & 91 deletions
This file was deleted.

‎tests/test_utils.h‎

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,69 @@
44
#include <thread>
55
#include <vector>
66
#include "cppkafka/consumer.h"
7+
#include "cppkafka/utils/roundrobin_poll_strategy.h"
8+
#include "cppkafka/utils/consumer_dispatcher.h"
79

8-
class ConsumerRunner {
10+
extern const std::vector<std::string> KAFKA_TOPICS;
11+
extern const int KAFKA_NUM_PARTITIONS;
12+
13+
using namespace cppkafka;
14+
15+
//==================================================================================
16+
// BasicConsumerRunner
17+
//==================================================================================
18+
template <typename ConsumerType>
19+
class BasicConsumerRunner {
920
public:
10-
ConsumerRunner(cppkafka::Consumer& consumer, size_t expected, size_t partitions);
11-
ConsumerRunner(const ConsumerRunner&) = delete;
12-
ConsumerRunner& operator=(const ConsumerRunner&) = delete;
13-
~ConsumerRunner();
21+
BasicConsumerRunner(ConsumerType& consumer,
22+
size_t expected,
23+
size_t partitions);
24+
BasicConsumerRunner(const BasicConsumerRunner&) = delete;
25+
BasicConsumerRunner& operator=(const BasicConsumerRunner&) = delete;
26+
~BasicConsumerRunner();
1427

1528
const std::vector<cppkafka::Message>& get_messages() const;
1629

1730
void try_join();
1831
private:
19-
cppkafka::Consumer& consumer_;
32+
ConsumerType& consumer_;
2033
std::thread thread_;
2134
std::vector<cppkafka::Message> messages_;
2235
};
2336

37+
//==================================================================================
38+
// RoundRobinConsumer
39+
//==================================================================================
40+
/**
41+
* \brief Specific implementation which can be used with other
42+
* util classes such as BasicConsumerDispatcher.
43+
*/
44+
class RoundRobinConsumer : public Consumer
45+
{
46+
public:
47+
RoundRobinConsumer(Configuration config);
48+
void create_polling_strategy();
49+
void delete_polling_strategy();
50+
Message poll();
51+
Message poll(std::chrono::milliseconds timeout);
52+
MessageList poll_batch(size_t max_batch_size);
53+
MessageList poll_batch(size_t max_batch_size,
54+
std::chrono::milliseconds timeout);
55+
void set_timeout(std::chrono::milliseconds timeout);
56+
std::chrono::milliseconds get_timeout();
57+
private:
58+
std::unique_ptr<RoundRobinPollStrategy> strategy_;
59+
};
60+
61+
using RRConsumerDispatcher = BasicConsumerDispatcher<RoundRobinConsumer>;
62+
using RRConsumerRunner = BasicConsumerRunner<RoundRobinConsumer>;
63+
using ConsumerRunner = BasicConsumerRunner<Consumer>;
64+
65+
//==================================================================================
66+
// Helper functions
67+
//==================================================================================
68+
std::vector<int> make_roundrobin_partition_vector(int total_messages);
69+
70+
#include "test_utils_impl.h"
71+
2472
#endif // CPPKAFKA_TEST_UTILS_H

‎tests/test_utils_impl.h‎

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#include <mutex>
2+
#include <chrono>
3+
#include <condition_variable>
4+
#include "test_utils.h"
5+
#include "cppkafka/utils/consumer_dispatcher.h"
6+
7+
using std::vector;
8+
using std::move;
9+
using std::thread;
10+
using std::mutex;
11+
using std::lock_guard;
12+
using std::unique_lock;
13+
using std::condition_variable;
14+
using std::chrono::system_clock;
15+
using std::chrono::milliseconds;
16+
using std::chrono::seconds;
17+
18+
using cppkafka::Consumer;
19+
using cppkafka::BasicConsumerDispatcher;
20+
21+
using cppkafka::Message;
22+
using cppkafka::MessageList;
23+
using cppkafka::TopicPartition;
24+
25+
//==================================================================================
26+
// BasicConsumerRunner
27+
//==================================================================================
28+
template <typename ConsumerType>
29+
BasicConsumerRunner<ConsumerType>::BasicConsumerRunner(ConsumerType& consumer,
30+
size_t expected,
31+
size_t partitions)
32+
: consumer_(consumer) {
33+
bool booted = false;
34+
mutex mtx;
35+
condition_variable cond;
36+
thread_ = thread([&, expected, partitions]() {
37+
consumer_.set_timeout(milliseconds(500));
38+
size_t number_eofs = 0;
39+
auto start = system_clock::now();
40+
BasicConsumerDispatcher<ConsumerType> dispatcher(consumer_);
41+
dispatcher.run(
42+
// Message callback
43+
[&](Message msg) {
44+
if (number_eofs == partitions) {
45+
messages_.push_back(move(msg));
46+
}
47+
},
48+
// EOF callback
49+
[&](typename BasicConsumerDispatcher<ConsumerType>::EndOfFile, const TopicPartition& topic_partition) {
50+
if (number_eofs != partitions) {
51+
number_eofs++;
52+
if (number_eofs == partitions) {
53+
lock_guard<mutex> _(mtx);
54+
booted = true;
55+
cond.notify_one();
56+
}
57+
}
58+
},
59+
// Every time there's any event callback
60+
[&](typename BasicConsumerDispatcher<ConsumerType>::Event) {
61+
if (expected > 0 && messages_.size() == expected) {
62+
dispatcher.stop();
63+
}
64+
if (expected == 0 && number_eofs >= partitions) {
65+
dispatcher.stop();
66+
}
67+
if (system_clock::now() - start >= seconds(20)) {
68+
dispatcher.stop();
69+
}
70+
}
71+
);
72+
// dispatcher has stopped
73+
if (number_eofs < partitions) {
74+
lock_guard<mutex> _(mtx);
75+
booted = true;
76+
cond.notify_one();
77+
}
78+
});
79+
80+
unique_lock<mutex> lock(mtx);
81+
while (!booted) {
82+
cond.wait(lock);
83+
}
84+
}
85+
86+
template <typename ConsumerType>
87+
BasicConsumerRunner<ConsumerType>::~BasicConsumerRunner() {
88+
try_join();
89+
}
90+
91+
template <typename ConsumerType>
92+
const MessageList& BasicConsumerRunner<ConsumerType>::get_messages() const {
93+
return messages_;
94+
}
95+
96+
template <typename ConsumerType>
97+
void BasicConsumerRunner<ConsumerType>::try_join() {
98+
if (thread_.joinable()) {
99+
thread_.join();
100+
}
101+
}
102+
103+
//==================================================================================
104+
// RoundRobinConsumer
105+
//==================================================================================
106+
inline
107+
RoundRobinConsumer::RoundRobinConsumer(Configuration config)
108+
: Consumer(config) {
109+
}
110+
111+
inline
112+
void RoundRobinConsumer::create_polling_strategy() {
113+
strategy_.reset(new RoundRobinPollStrategy(*this));
114+
}
115+
116+
inline
117+
void RoundRobinConsumer::delete_polling_strategy() {
118+
strategy_.reset();
119+
}
120+
121+
inline
122+
Message RoundRobinConsumer::poll() {
123+
if (strategy_) {
124+
return strategy_->poll();
125+
}
126+
return Consumer::poll();
127+
}
128+
129+
inline
130+
Message RoundRobinConsumer::poll(milliseconds timeout) {
131+
if (strategy_) {
132+
return strategy_->poll(timeout);
133+
}
134+
return Consumer::poll(timeout);
135+
}
136+
137+
inline
138+
MessageList RoundRobinConsumer::poll_batch(size_t max_batch_size) {
139+
if (strategy_) {
140+
return strategy_->poll_batch(max_batch_size);
141+
}
142+
return Consumer::poll_batch(max_batch_size);
143+
}
144+
145+
inline
146+
MessageList RoundRobinConsumer::poll_batch(size_t max_batch_size,
147+
milliseconds timeout) {
148+
if (strategy_) {
149+
return strategy_->poll_batch(max_batch_size, timeout);
150+
}
151+
return Consumer::poll_batch(max_batch_size, timeout);
152+
}
153+
154+
inline
155+
void RoundRobinConsumer::set_timeout(milliseconds timeout) {
156+
if (strategy_) {
157+
strategy_->set_timeout(timeout);
158+
}
159+
else {
160+
Consumer::set_timeout(timeout);
161+
}
162+
}
163+
164+
inline
165+
milliseconds RoundRobinConsumer::get_timeout() {
166+
if (strategy_) {
167+
return strategy_->get_timeout();
168+
}
169+
return Consumer::get_timeout();
170+
}
171+
172+
//==================================================================================
173+
// Helper functions
174+
//==================================================================================
175+
inline
176+
vector<int> make_roundrobin_partition_vector(int total_messages) {
177+
vector<int> partition_order;
178+
//int messages_per_partition = total_messages / KAFKA_NUM_PARTITIONS;
179+
for (int i = 0, partition = 0; i < total_messages+1; ++i) {
180+
if ((i % KAFKA_NUM_PARTITIONS) == 0) {
181+
partition = 0;
182+
}
183+
partition_order.push_back(partition++);
184+
}
185+
return partition_order;
186+
}
187+

0 commit comments

Comments
 (0)
Please sign in to comment.