Skip to content

Bug fixes for sync flush and add_tracker #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 18, 2018

Conversation

accelerated
Copy link
Contributor

Two bugs were fixed:

  1. SenderType == Sync was not set for sync_producer
  2. Synchronous flushing would possibly reorder packets if a message failed and was retried. The original BufferedProducer implementation would post the failed Message from within the on_delivery_report callback thus guaranteeing original order if a message was to be retried.

@accelerated accelerated changed the title fixes for sync flush and also add_tracker Bug fixes for sync flush and also add_tracker Jun 18, 2018
@accelerated accelerated changed the title Bug fixes for sync flush and also add_tracker Bug fixes for sync flush and add_tracker Jun 18, 2018
@accelerated
Copy link
Contributor Author

Note: ideally if order preservation is needed with a retry mechanism, users should call BufferedProducer::set_max_number_retries() > 0 and then call sync_produce or flush. Also set messages.send.max.retries = 0 in rdkafka as this setting may interleave messages.

@@ -527,8 +529,16 @@ void BufferedProducer<BufferType>::async_flush() {

template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
async_flush();
wait_for_acks();
CounterGuard<size_t> counter_guard(flushes_in_progress_);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that this is trying to preserve the ordering but this is extremely inefficient. I'm not sure this should be the default behavior. Many applications just care about the entire batch of messages to be produced, they don't care too much about the exact ordering in which that happens. Maybe this belongs on a separate method or something like that.

@accelerated
Copy link
Contributor Author

well I guess there's flush, async_flush and we can add order_preserve_flush()? Or add a default parameter flush(bool preserve_order = false) which is OK as the BufferedProducer is a template function so it won't break ABI.
Speaking of rdkafka retries, it's not clear how the retry works...is it the entire batch of messages or the messages that failed only? I can't imagine Kafka rejecting message 2 and 4 but not message 1 , 3 and 5. I can see it however accepting 1, 2, 3 and rejecting 4 and 5 in the batch. Any ideas?

@mfontanini
Copy link
Owner

I think kafka could actually reject message 2 and 4 but not 3. There's a partition leader for each partition so if the leader of partition 3 is up but the one(s) for 2 and 4 is having issues, I think this could happen.

I think adding a default parameter may work? There should be some documentation indicating some "Be aware that preserving the order can add significant time overhead to producing messages" or something like that.

@accelerated
Copy link
Contributor Author

Ok I'll put a flag in the flush function.

In reference to what you're saying, my understanding was that the batches are per-partition, and in that case, they only go to the leader of that partition. If messages in the batch are intermixed (i.e. one queue per topic instead of per-partition) then your assumption will be correct. However, producing in batches is not necessarily MUCH more performant, you have more efficient bandwidth utilization but then you get into IP fragmentation, reassemby and such issues. Also batching means delaying sending the message to the remote application, so while one client is waiting to batch say 100 messages, another non-batched client may have already sent and received 50 acks. Only proper benchmarking (and that's very network setup dependent) can say what the performance increase is.
Most tcp/ip socket implementations don't really batch more than what the kernel buffer has and in the TCP case it will only send what fits into its sliding window, prob way less than what rdkafka batches.

BTW if you don't mind sharing, how do you use the BufferedProducer? Do you do most buffering inside it and try not to use rdkafka's internal buffers? or do you only buffer when throttling? or you buffer when you need to do ack counting?

@accelerated accelerated force-pushed the buffered_producer_sync_fix branch from 0851cac to 4d44dc9 Compare June 18, 2018 20:49
@mfontanini
Copy link
Owner

my understanding was that the batches are per-partition, and in that case, they only go to the leader of that partition

Right, which means if message 1 and 3 go to partition 1 and message 2 goes to partition 2, then the batch for (1, 3) can fail and the batch for 2 succeeds, then that could happen.

The idea of batching is to remove the rtt between the client and kafka. You don't want to run to the kafka server and say "hey I'm producing this one message" 100 times rather than go to the server a single time and say "hey I'm producing these 100 messages". Requests/responses will take time to travel through the wire so you want to not make more than you should.

BTW if you don't mind sharing, how do you use the BufferedProducer? Do you do most buffering inside it and try not to use rdkafka's internal buffers? or do you only buffer when throttling? or you buffer when you need to do ack counting?

I normally can split my data into chunks so I usually gather data for that chunk and once I'm done I generate the output data and simply call produce_message and then flush (meaning wait_for_acks). I don't really need to buffer the data and I need to process and entire chunk before generating the output so there's no reason for me to buffer messages instead of producing them right away. I also throttle on consumption so if I'm taking too long to process messages (or I'm just waiting for an input chunk to finish arriving) I just pause the consumption. Once I catch up I resume and keep the process going.

@accelerated
Copy link
Contributor Author

accelerated commented Jun 18, 2018

Thanks for the input. My data is quite different than yours and ordering is very important, but I have it configurable by the app so if ordering is not important for a certain topic, then they can configure it to just do normal batching.

What i meant earlier about the "batches per-partition" is that the output rdkafka queues are per-partition so when a batch is sent for partition A, all messages in that batch are for the same partition so all can fail or perhaps the first X messages in the batch. But maybe i'm wrong. Anyway...thanks for reviewing this!!

@mfontanini
Copy link
Owner

is that the output rdkafka queues are per-partition so when a batch is sent for partition A, all messages in that batch are for the same partition so all can fail

Right, but you can add messages for multiple partitions so when you flush you could send multiple batches, one (or more?) per partition and each individual batch can fail so if some partition leader is broken, that particular batch will fail but the rest will succeed, which means you could have that situation you mentioned where 2 succeeds but 1 and 3 fail because they belong to another batch for a different broker.

@mfontanini mfontanini merged commit eb46b88 into mfontanini:master Jun 18, 2018
@accelerated accelerated deleted the buffered_producer_sync_fix branch June 19, 2018 01:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants