Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve performance of Table.add_files by parallelizing #1335

Open
vtk9 opened this issue Nov 18, 2024 · 9 comments
Open

improve performance of Table.add_files by parallelizing #1335

vtk9 opened this issue Nov 18, 2024 · 9 comments

Comments

@vtk9
Copy link

vtk9 commented Nov 18, 2024

Feature Request / Improvement

Table.add_files() processes the list of files in sequential order. Part of this flow can be parallelized, particularly

data_files = _parquet_files_to_data_files(
table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
)
.

@kevinjqliu
Copy link
Contributor

_parquet_files_to_data_files is a generator and uses parquet_files_to_data_files which is also a generator

what do you have in mind to parallelize this part of the code?

@bigluck
Copy link
Contributor

bigluck commented Nov 20, 2024

I believe @vtk9 is suggesting the files to be read in parallel rather than sequentially.

I could be mistaken, but it seems that if you have 10,000 files, each one is being read one after the other. This approach can be quite time-consuming, even though I understand that we are only reading the metadata of each parquet file.

One option could be to have something like (pseudo-code alert):

def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for file_path in file_paths:
             futures.append(executor.submit(scan_file, file_path))
        for future in concurrent.futures.as_completed(futures):
             yield future.result()

@vtk9
Copy link
Author

vtk9 commented Nov 20, 2024

Apologies @kevinjqliu , i forgot to link the relevant slack thread https://apache-iceberg.slack.com/archives/C029EE6HQ5D/p1731611943890879

Exactly, thank you @bigluck!
I tried something like this and there's a noticeable improvement even when add_files contains ~30 files

@kevinjqliu
Copy link
Contributor

thanks @bigluck that makes sense! I think _parquet_files_to_data_files might be a good place to add the parallelism

@vtk9 is this something you would like to contribute?

@kevinjqliu
Copy link
Contributor

@vtk9 thanks for the context from slack, I must have missed that thread

@vtk9
Copy link
Author

vtk9 commented Nov 20, 2024

@kevinjqliu when i find time, yes.

I would definitely love this feature in the next release of pyiceberg and will prioritize this with enough heads up (if possible) before a release

@kevinjqliu
Copy link
Contributor

sounds good! Feel free to ping me for review. I'll add this issue to the 0.8.1 milestone for now

@kevinjqliu kevinjqliu added this to the PyIceberg 0.8.1 release milestone Nov 20, 2024
@Fokko
Copy link
Contributor

Fokko commented Nov 20, 2024

Yes, looks like this shouldn't be too hard. I think it would be good to re-use the ExecutorFactory:

I would refactor parquet_files_to_data_files to let it take a single file instead of an Iterator, and then call it parquet_file_to_data_file.

def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
    """Convert a list files into DataFiles.

    Returns:
        An iterable that supplies DataFiles that describe the parquet files.
    """
    from pyiceberg.io.pyarrow import parquet_files_to_data_files

    executor = ExecutorFactory.get_or_create()
    futures = [
        executor.submit(
            parquet_file_to_data_file,
            io,
            table_metadata,
            file_path
        )
        for file_path in file_paths
    ]

    return [f.result() for f in futures if f.result()]

@kevinjqliu I would not classify this as a bugfix, so I'm not sure if this is appropriate for 0.8.1.

@kevinjqliu kevinjqliu removed this from the PyIceberg 0.8.1 release milestone Nov 20, 2024
@kevinjqliu
Copy link
Contributor

make sense, this is a feature

@Fokko Fokko added this to the PyIceberg 0.9.0 release milestone Nov 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants