Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] RecordBatchReader failed when reading parquet file #45116

Open
PPParticle opened this issue Dec 28, 2024 · 7 comments
Open

[C++] RecordBatchReader failed when reading parquet file #45116

PPParticle opened this issue Dec 28, 2024 · 7 comments

Comments

@PPParticle
Copy link

PPParticle commented Dec 28, 2024

Describe the bug, including details regarding any error messages, version, and platform.

I tried to use arrow::recoredbatchreader to read multiple rowgroups from a parquet file in parallelism. I use GetRecordBatchReader to acquire recordbatchreader. However, I noticed that when the number of task exceeded the number of cores, the reading would stop at RETURN_NOT_OK(ReadNext(&batch));. The recordbatchreader only works when the number of tasks is less than the number of cores.
And here is my codes:

#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/util/thread_pool.h>
#include <arrow/util/parallel.h>
#include <arrow/util/range.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/column_reader.h>
#include <parquet/column_scanner.h>
#include <parquet/exception.h>
#include <parquet/types.h>
#include <cstring>
#include <chrono>
#include <iostream>

template <class FUNCTION, typename T,
          typename R = typename arrow::internal::call_traits::return_type<FUNCTION>::ValueType>
arrow::Future<std::vector<R>> ParallelForAsync_test(
    std::vector<T> inputs, FUNCTION&& func, 
    arrow::internal::Executor* executor = arrow::internal::GetCpuThreadPool()) {
  std::vector<arrow::Future<R>> futures(inputs.size());
  for (size_t i = 0; i < inputs.size(); ++i) {
    ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
  }
  return All(std::move(futures))
      .Then([](const std::vector<arrow::Result<R>>& results) -> arrow::Result<std::vector<R>> {
        return  arrow::internal::UnwrapOrRaise(results);
      });
}

arrow::Result<std::shared_ptr<arrow::Array>> ChunkedArrayToArray(std::shared_ptr<arrow::ChunkedArray> chunked_array) {
        auto arrays = chunked_array->chunks();
        std::shared_ptr<arrow::Array> result;
        ARROW_ASSIGN_OR_RAISE(result, arrow::Concatenate(arrays));

        return result;
    }

arrow::Status read_whole_file(std::string file, int batch_size, int &size) {

  ::arrow::MemoryPool* pool = ::arrow::default_memory_pool();

  auto reader_properties = parquet::ReaderProperties(pool);
  reader_properties.set_buffer_size(4096 * 4);
  reader_properties.enable_buffered_stream();
  
  auto arrow_reader_props = parquet::ArrowReaderProperties();
  arrow_reader_props.set_batch_size(4 * 1024);
  arrow_reader_props.set_use_threads(true);

  parquet::arrow::FileReaderBuilder reader_builder;
  ARROW_RETURN_NOT_OK(reader_builder.OpenFile(file, false, reader_properties));
  reader_builder.memory_pool(pool);
  reader_builder.properties(arrow_reader_props);

  std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
  ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());

  auto p_reader = arrow_reader->parquet_reader();
  int nrgs = p_reader->metadata()->num_row_groups();
  int nrows = p_reader->metadata()->num_rows();

  int ncolumns = p_reader->metadata()->num_columns();
  auto cpu_executor = ::arrow::internal::GetCpuThreadPool();

  int rg_batchsize = nrgs / batch_size;

  std::vector<std::shared_ptr<arrow::RecordBatchReader>> vec_reader;
  std::cout << "total has " << nrgs << " rowgroups" << std::endl;
  for (int j = 0; j <= rg_batchsize; j++) {
    std::shared_ptr<arrow::RecordBatchReader> rb_reader;
    arrow_reader->GetRecordBatchReader(
        arrow::internal::Iota(j * batch_size, std::min((j+1) * batch_size, nrgs)), &rb_reader);
        std::cout << "task "<< j << " range [" << j*batch_size << "," << std::min((j+1) * batch_size, nrgs) << "]" << std::endl;
    vec_reader.emplace_back(rb_reader);
  }
  size = vec_reader.size();
  std::cout << size << std::endl;
  auto thread_start = std::chrono::high_resolution_clock::now();
  auto read_recordbatch = [ncolumns, thread_start](size_t i, std::shared_ptr<::arrow::RecordBatchReader> reader)
        -> ::arrow::Result<bool>{
        auto io_start = std::chrono::high_resolution_clock::now();
        auto result = reader->ToTable();
        std::vector<std::shared_ptr<::arrow::Array>> vec_array;
        if (result.ok()) {
          auto table = *result;
          for (int i = 0; i < ncolumns; i++) {
            auto result = ChunkedArrayToArray(table->column(i));
            if (result.ok()) {
              auto array = *result;
              vec_array.emplace_back(array);
            }
          }
        }
        auto io_end = std::chrono::high_resolution_clock::now();
        
        reader->Close();
        std::cout << "thread " << i << " " << vec_array[0]->length() << " rows " 
                    << "start_overhead " << std::chrono::duration<double, std::milli>(io_start-thread_start).count() << " ms "
                    << "io_overhead " << std::chrono::duration<double, std::milli>(io_end-io_start).count() << " ms" << std::endl;
        return true;

  };

  auto re = ParallelForAsync_test(std::move(vec_reader), read_recordbatch, cpu_executor)
                .MoveResult();
  auto re_chka = re.ValueOrDie();

  return ::arrow::Status::OK();
}

int main(int argc, char* argv[]) {

  std::string file = argv[1];
  int batch_size = atoi(argv[2]);
  int size = 0;
  auto start = std::chrono::steady_clock::now();
  read_whole_file(file, batch_size, size);
  auto end = std::chrono::steady_clock::now();

  printf("%d thread(s) %f ms\n", size, std::chrono::duration<double, std::milli>(end-start).count());

  return 0;
}

Component(s)

C++

@PPParticle
Copy link
Author

The arrow version I use is 14.0.0.

@PPParticle
Copy link
Author

PPParticle commented Dec 28, 2024

Furthermore, I tried to increase the capacity of the thread_pool, and I found that the tasks did not stop but pop a segmentation fault and the tasks did not read any data at all.

@kou kou changed the title recordbatchreader failed when reading parquet file [C++] recordbatchreader failed when reading parquet file Dec 29, 2024
@kou kou changed the title [C++] recordbatchreader failed when reading parquet file [C++] RecordBatchReader failed when reading parquet file Dec 29, 2024
@kou
Copy link
Member

kou commented Dec 29, 2024

Could you try the latest release?

@kou
Copy link
Member

kou commented Dec 29, 2024

Could you provide a Parquet file that reproduces this problem?

@PPParticle
Copy link
Author

Could you provide a Parquet file that reproduces this problem?
customer.zip

I added a predicate statement to make sure the scan ranges valid before GetRecordBatchReader, which was if(j * batch_size >= std::min((j + 1) * batch_size, nrgs)) break;. It turned that the scan range that [x,x] was avoided. However, it still did not fix the problem, especially runing the compiled program by ./a.out customer.parquet 1.

@wgtmac
Copy link
Member

wgtmac commented Dec 30, 2024

int rg_batchsize = nrgs / batch_size;

This line looks weird to me. Please note that GetRecordBatchReader cannot read only partial rows from a single row group. The maximum parallelism is bound by number of row groups in your parquet file.

@PPParticle
Copy link
Author

ound by number of row groups in your parquet file.

I understand what you have said.
The codeint rg_batchsize = nrgs / batch_size; means I merge batch_size rowgroups into one recordbatch, and RecordBatchReader would read the rowgroups as one RecordBatch. The reason why I did this was because I could not change the parquet file but I wanted to increase the io granularity each thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants