Skip to content

refactor: table provider #1397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions bindings/python/python/pyiceberg_core/datafusion.pyi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs license header

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

from typing import Any

class IcebergDataFusionTable:
def __init__(
self,
identifier: list[str],
metadata_location: str,
storage_options: dict[str, str] | None,
) -> None:
"""Create Iceberg table that can be registered as a datafusion table provider

Example:
```python
from pyiceberg_core.datafusion import IcebergDataFusionTable
from datafusion import SessionContext

ice_tbl = IcebergDataFusionTable()
ctx = SessionContext()
ctx.register_table_provider("test", ice_tbl)
ctx.table("test").show()
```
Results in
```
DataFrame()
+----+----+----+
| c3 | c1 | c2 |
+----+----+----+
| 4 | 6 | a |
| 6 | 5 | b |
| 5 | 4 | c |
+----+----+----+
```
"""

def __datafusion_table_provider__(self) -> Any:
"""Return the DataFusion table provider PyCapsule interface.

To support DataFusion features such as push down filtering, this function will return a PyCapsule
interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
you should not need to call this function directly. Instead you can use ``register_table_provider`` in
the DataFusion SessionContext.

Returns:
A PyCapsule DataFusion TableProvider interface.
"""
4 changes: 2 additions & 2 deletions bindings/python/src/datafusion_table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl PyIcebergDataFusionTable {
fn new(
identifier: Vec<String>,
metadata_location: String,
file_io_properties: Option<HashMap<String, String>>,
storage_options: Option<HashMap<String, String>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we release 0.5.1 with file_io_properties so this is a breaking change, but i do like storage_options more

) -> PyResult<Self> {
let runtime = runtime();

Expand All @@ -52,7 +52,7 @@ impl PyIcebergDataFusionTable {
let mut builder = FileIO::from_path(&metadata_location)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to init FileIO: {e}")))?;

if let Some(props) = file_io_properties {
if let Some(props) = storage_options {
builder = builder.with_props(props);
}

Expand Down
15 changes: 8 additions & 7 deletions bindings/python/tests/test_datafusion_table_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
# under the License.


from datetime import date, datetime
import uuid
from datetime import date, datetime
from pathlib import Path

import datafusion
import pyarrow as pa
import pytest
from pyiceberg_core.datafusion import IcebergDataFusionTable
from datafusion import SessionContext
from pyiceberg.catalog import Catalog, load_catalog
import pyarrow as pa
from pathlib import Path
import datafusion
from pyiceberg_core.datafusion import IcebergDataFusionTable

assert (
datafusion.__version__ >= "45"
Expand Down Expand Up @@ -102,7 +103,7 @@ def test_register_iceberg_table_provider(
iceberg_table_provider = IcebergDataFusionTable(
identifier=iceberg_table.name(),
metadata_location=iceberg_table.metadata_location,
file_io_properties=iceberg_table.io.properties,
storage_options=iceberg_table.io.properties,
)

ctx = SessionContext()
Expand Down Expand Up @@ -146,7 +147,7 @@ def __datafusion_table_provider__(self):
return IcebergDataFusionTable(
identifier=self.name(),
metadata_location=self.metadata_location,
file_io_properties=self.io.properties,
storage_options=self.io.properties,
).__datafusion_table_provider__()

iceberg_table.__datafusion_table_provider__ = MethodType(
Expand Down
Loading