diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 0ee3f93d..8f3a9b2c 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -708,6 +708,7 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti }; re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_); re_enqueuer(flush_queue, messages_, mutex_); + return true; } else { async_flush(); @@ -811,7 +812,7 @@ void BufferedProducer::do_add_message(BuilderType&& build // Flush the queues only if a regular message is added. Retry messages may be added // from rdkafka callbacks, and flush/async_flush is a user-level call - if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { + if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); }