arrow/cpp/src/parquet/arrow/
RowGroupRecordBatchReader
class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
public:
RowGroupRecordBatchReader(::arrow::RecordBatchIterator batches,
std::shared_ptr<::arrow::Schema> schema)
: batches_(std::move(batches)), schema_(std::move(schema)) {}
~RowGroupRecordBatchReader() override {}
Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
return batches_.Next().Value(out);
}
std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
private:
::arrow::Iterator<std::shared_ptr<::arrow::RecordBatch>> batches_;
std::shared_ptr<::arrow::Schema> schema_;
};
GetRecordBatchReader
/apache/arrow/blob/61c15ed2f2ba1f77650de1eebace10f32cf5a695/cpp/src/parquet/arrow/#L950
Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
if (reader_properties_.pre_buffer()) {
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> batch_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));
if (()) {
// Just generate all batches right now; they're cheap since they have no columns.
int64_t batch_size = properties().batch_size();
auto max_sized_batch =
::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});
::arrow::RecordBatchVector batches;
for (int row_group : row_groups) {
int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
((), num_rows / batch_size, max_sized_batch);
if (int64_t trailing_rows = num_rows % batch_size) {
batches.push_back(max_sized_batch->Slice(0, trailing_rows));
}
}
*out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));
return Status::OK();
}
int64_t num_rows = 0;
for (int row_group : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
}
using ::arrow::RecordBatchIterator;
// NB: This lambda will be invoked outside the scope of this call to
// `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
// `this` is a non-owning pointer so we are relying on the parent FileReader outliving
// this RecordBatchReader.
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
[readers, batch_schema, num_rows,
this]() mutable -> ::arrow::Result<RecordBatchIterator> {
::arrow::ChunkedArrayVector columns(());
// don't reserve more rows than necessary
int64_t batch_size = std::min(properties().batch_size(), num_rows);
num_rows -= batch_size;
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(()),
[&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));
for (const auto& column : columns) {
if (column == nullptr || column->length() == 0) {
return ::arrow::IterationTraits<RecordBatchIterator>::End();
}
}
auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
// NB: explicitly preserve table so that table_reader doesn't outlive it
return ::arrow::MakeFunctionIterator(
[table, table_reader] { return table_reader->Next(); });
});
*out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));
return Status::OK();
}