Skip to content

Commit 6a3c530

Browse files
committed
Support http(s) stores
1 parent b655643 commit 6a3c530

File tree

10 files changed

+156
-7
lines changed

10 files changed

+156
-7
lines changed

.devcontainer/.env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,9 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
1818
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
1919
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"
2020

21+
# http(s) tests
22+
ALLOW_HTTP=true
23+
HTTP_ENDPOINT=http://localhost:8080
24+
2125
# Others
2226
RUST_TEST_THREADS=1

.devcontainer/docker-compose.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ services:
2020
depends_on:
2121
- minio
2222
- azurite
23+
- webdav
2324

2425
minio:
2526
image: minio/minio
@@ -47,3 +48,16 @@ services:
4748
interval: 6s
4849
timeout: 2s
4950
retries: 3
51+
52+
webdav:
53+
image: rclone/rclone
54+
command: ["serve", "webdav", "/data", "--addr", ":8080"]
55+
env_file:
56+
- .env
57+
network_mode: host
58+
restart: unless-stopped
59+
healthcheck:
60+
test: ["CMD", "curl", "http://localhost:8080"]
61+
interval: 6s
62+
timeout: 2s
63+
retries: 3

.github/workflows/ci.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,18 @@ jobs:
140140
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
141141
az storage container create -n ${AZURE_TEST_CONTAINER_NAME}2 --connection-string $AZURE_STORAGE_CONNECTION_STRING
142142
143+
- name: Start local web server for http(s) tests
144+
run: |
145+
docker run -d \
146+
--env-file .devcontainer/.env \
147+
-p 8080:80 \
148+
rclone/rclone serve webdav /data --addr :80
149+
150+
while ! curl $HTTP_ENDPOINT; do
151+
echo "Waiting for $HTTP_ENDPOINT..."
152+
sleep 1
153+
done
154+
143155
- name: Run tests
144156
run: |
145157
# Run tests with coverage tool

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ aws-credential-types = {version = "1", default-features = false}
2828
azure_storage = {version = "0.21", default-features = false}
2929
futures = "0.3"
3030
home = "0.5"
31-
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
31+
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "http"]}
3232
once_cell = "1"
3333
parquet = {version = "54", default-features = false, features = [
3434
"arrow",

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM
156156
```
157157

158158
## Object Store Support
159-
`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores.
159+
`pg_parquet` supports reading and writing Parquet files from/to `S3`, `Azure Blob Storage` and `http(s)` object stores.
160160

161161
> [!NOTE]
162162
> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user.
@@ -239,6 +239,10 @@ Supported authorization methods' priority order is shown below:
239239
2. Sas token,
240240
3. Storage key.
241241

242+
#### Http(s) Storage
243+
244+
`Https` uris are supported by default. You can set `ALLOW_HTTP` environment variable to allow `http` uris.
245+
242246
## Copy Options
243247
`pg_parquet` supports the following options in the `COPY TO` command:
244248
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,

src/arrow_parquet/uri_utils.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use url::Url;
1919

2020
use crate::{
2121
object_store::{
22-
aws::parse_s3_bucket, azure::parse_azure_blob_container,
22+
aws::parse_s3_bucket, azure::parse_azure_blob_container, http::parse_http_base_uri,
2323
object_store_cache::get_or_create_object_store,
2424
},
2525
PG_BACKEND_TOKIO_RUNTIME,
@@ -50,7 +50,7 @@ impl ParsedUriInfo {
5050
fn try_parse_scheme(uri: &Url) -> Result<(ObjectStoreScheme, Path), String> {
5151
ObjectStoreScheme::parse(uri).map_err(|_| {
5252
format!(
53-
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
53+
"unrecognized uri {}. pg_parquet supports local paths, https://, s3:// or az:// schemes.",
5454
uri
5555
)
5656
})
@@ -64,8 +64,11 @@ impl ParsedUriInfo {
6464
ObjectStoreScheme::MicrosoftAzure => parse_azure_blob_container(uri)
6565
.ok_or(format!("unsupported azure blob storage uri: {uri}"))
6666
.map(Some),
67+
ObjectStoreScheme::Http => parse_http_base_uri(uri).
68+
ok_or(format!("unsupported http storage uri: {uri}"))
69+
.map(Some),
6770
ObjectStoreScheme::Local => Ok(None),
68-
_ => Err(format!("unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
71+
_ => Err(format!("unsupported scheme {} in uri {}. pg_parquet supports local paths, https://, s3:// or az:// schemes.",
6972
uri.scheme(), uri))
7073
}
7174
}

src/object_store.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ use crate::{
99

1010
pub(crate) mod aws;
1111
pub(crate) mod azure;
12+
pub(crate) mod http;
1213
pub(crate) mod local_file;
1314
pub(crate) mod object_store_cache;

src/object_store/http.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::sync::Arc;
2+
3+
use object_store::{http::HttpBuilder, ClientOptions};
4+
use url::Url;
5+
6+
use super::object_store_cache::ObjectStoreWithExpiration;
7+
8+
// create_http_object_store creates a http(s) object store with the given bucket name.
9+
pub(crate) fn create_http_object_store(uri: &Url) -> ObjectStoreWithExpiration {
10+
let base_uri = parse_http_base_uri(uri).unwrap_or_else(|| {
11+
panic!("unsupported http uri: {}", uri);
12+
});
13+
14+
let allow_http = std::env::var("ALLOW_HTTP").is_ok();
15+
16+
let client_options = ClientOptions::new()
17+
.with_allow_http2()
18+
.with_allow_http(allow_http);
19+
20+
let http_builder = HttpBuilder::new()
21+
.with_url(base_uri)
22+
.with_client_options(client_options);
23+
24+
let object_store = http_builder.build().unwrap_or_else(|e| panic!("{}", e));
25+
26+
let expire_at = None;
27+
28+
ObjectStoreWithExpiration {
29+
object_store: Arc::new(object_store),
30+
expire_at,
31+
}
32+
}
33+
34+
pub(crate) fn parse_http_base_uri(uri: &Url) -> Option<String> {
35+
let scheme = uri.scheme();
36+
37+
let host = uri.host_str().expect("http uri missing host");
38+
39+
let port = uri.port().map(|p| format!(":{}", p)).unwrap_or_default();
40+
41+
Some(format!("{}://{}{}", scheme, host, port))
42+
}

src/object_store/object_store_cache.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use url::Url;
1212

1313
use crate::arrow_parquet::uri_utils::ParsedUriInfo;
1414

15-
use super::{create_azure_object_store, create_local_file_object_store, create_s3_object_store};
15+
use super::{
16+
create_azure_object_store, create_local_file_object_store, create_s3_object_store,
17+
http::create_http_object_store,
18+
};
1619

1720
// OBJECT_STORE_CACHE is a global cache for object stores per Postgres session.
1821
// It caches object stores based on the scheme and bucket.
@@ -86,9 +89,10 @@ impl ObjectStoreCache {
8689
match scheme {
8790
ObjectStoreScheme::AmazonS3 => create_s3_object_store(uri),
8891
ObjectStoreScheme::MicrosoftAzure => create_azure_object_store(uri),
92+
ObjectStoreScheme::Http => create_http_object_store(uri),
8993
ObjectStoreScheme::Local => create_local_file_object_store(uri, copy_from),
9094
_ => panic!(
91-
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
95+
"unsupported scheme {} in uri {}. pg_parquet supports local paths, https, s3:// or az:// schemes.",
9296
uri.scheme(),
9397
uri
9498
),

src/pgrx_tests/object_store.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,26 @@ mod tests {
630630
Spi::run("copy test_table from 'https://ACCOUNT.dfs.fabric.microsoft.com';").unwrap();
631631
}
632632

633+
#[pg_test]
634+
fn test_http_uri() {
635+
object_store_cache_clear();
636+
637+
let http_endpoint: String =
638+
std::env::var("HTTP_ENDPOINT").expect("HTTP_ENDPOINT not found");
639+
640+
let http_uris = [
641+
format!("{http_endpoint}/pg_parquet_test.parquet"),
642+
format!("{http_endpoint}/dummy/pg_parquet_test"),
643+
];
644+
645+
for http_uri in http_uris {
646+
let test_table = TestTable::<i32>::new("int4".into()).with_uri(http_uri);
647+
648+
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
649+
test_table.assert_expected_and_result_rows();
650+
}
651+
}
652+
633653
#[pg_test]
634654
#[should_panic(expected = "relative path not allowed")]
635655
fn test_copy_to_unsupported_scheme() {
@@ -759,5 +779,50 @@ mod tests {
759779
("MicrosoftAzure", "testcontainer2", None)
760780
]
761781
);
782+
783+
// https scheme and base uri
784+
let http_uri =
785+
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet";
786+
Spi::run(format!("SELECT * FROM parquet.schema('{http_uri}')").as_str()).unwrap();
787+
788+
assert_eq!(
789+
object_store_cache_items(),
790+
vec![
791+
("AmazonS3", "testbucket", None),
792+
("AmazonS3", "testbucket2", None),
793+
("MicrosoftAzure", "testcontainer", None),
794+
("HttpStore", "https://d37ci6vzurychx.cloudfront.net", None)
795+
]
796+
);
797+
798+
// https scheme and same base uri
799+
let http_uri =
800+
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet";
801+
Spi::run(format!("SELECT * FROM parquet.schema('{http_uri}')").as_str()).unwrap();
802+
803+
assert_eq!(
804+
object_store_cache_items(),
805+
vec![
806+
("AmazonS3", "testbucket", None),
807+
("AmazonS3", "testbucket2", None),
808+
("MicrosoftAzure", "testcontainer", None),
809+
("HttpStore", "https://d37ci6vzurychx.cloudfront.net", None)
810+
]
811+
);
812+
813+
// https scheme and different base uri
814+
let http_uri = "https://www.filesampleshub.com/download/code/parquet/sample1.parquet";
815+
Spi::run(format!("SELECT * FROM parquet.schema('{http_uri}')").as_str()).unwrap();
816+
817+
assert_eq!(
818+
object_store_cache_items(),
819+
vec![
820+
("AmazonS3", "testbucket", None),
821+
("AmazonS3", "testbucket2", None),
822+
("MicrosoftAzure", "testcontainer", None),
823+
("Http", "https://d37ci6vzurychx.cloudfront.net", None),
824+
("HttpStore", "https://www.filesampleshub.com", None)
825+
]
826+
);
762827
}
763828
}

0 commit comments

Comments
 (0)