Skip to content

Commit 17d4aa8

Browse files
committed
Cache object stores per scheme + bucket
In this PR, we cache object stores per scheme (e.g. s3) + bucket (container) combination in each Postgres session. This will reduce authentication costs by only doing it at the first time. object_store does not perform sts assume_role to get temp token, so pg_parquet make use of aws sdk to perform it. And then configure object_store with the temp token that it fetched. This is why, pg_parquet checks expiration of the tokens for s3 store. It will fetch the temp token if it expires. (obviously if you configured temp token auth via config) For azure blob store, we do not need to recreate tokens because object_store handles it.
1 parent b8f9061 commit 17d4aa8

File tree

10 files changed

+452
-64
lines changed

10 files changed

+452
-64
lines changed

.devcontainer/entrypoint.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM
44

55
# create azurite container
66
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
7+
az storage container create -n ${AZURE_TEST_CONTAINER_NAME}2 --connection-string $AZURE_STORAGE_CONNECTION_STRING
78

89
sleep infinity

.devcontainer/minio-entrypoint.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ done
1414
# set access key and secret key
1515
mc alias set local $AWS_ENDPOINT_URL $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD
1616

17-
# create bucket
17+
# create buckets
1818
mc mb local/$AWS_S3_TEST_BUCKET
19+
mc mb local/${AWS_S3_TEST_BUCKET}2
1920

2021
wait $minio_pid

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ jobs:
138138
139139
# create container
140140
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
141+
az storage container create -n ${AZURE_TEST_CONTAINER_NAME}2 --connection-string $AZURE_STORAGE_CONNECTION_STRING
141142
142143
- name: Run tests
143144
run: |

src/arrow_parquet/uri_utils.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use pgrx::{
1818
use url::Url;
1919

2020
use crate::{
21-
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE, object_store::create_object_store,
22-
PG_BACKEND_TOKIO_RUNTIME,
21+
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
22+
object_store::object_store_cache::get_or_create_object_store, PG_BACKEND_TOKIO_RUNTIME,
2323
};
2424

2525
const PARQUET_OBJECT_STORE_READ_ROLE: &str = "parquet_object_store_read";
@@ -58,7 +58,7 @@ pub(crate) fn parquet_schema_from_uri(uri: &Url) -> SchemaDescriptor {
5858

5959
pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
6060
let copy_from = true;
61-
let (parquet_object_store, location) = create_object_store(uri, copy_from);
61+
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
6262

6363
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
6464
let object_store_meta = parquet_object_store
@@ -81,7 +81,7 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
8181

8282
pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<ParquetObjectReader> {
8383
let copy_from = true;
84-
let (parquet_object_store, location) = create_object_store(uri, copy_from);
84+
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
8585

8686
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
8787
let object_store_meta = parquet_object_store
@@ -113,7 +113,7 @@ pub(crate) fn parquet_writer_from_uri(
113113
writer_props: WriterProperties,
114114
) -> AsyncArrowWriter<ParquetObjectWriter> {
115115
let copy_from = false;
116-
let (parquet_object_store, location) = create_object_store(uri, copy_from);
116+
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
117117

118118
let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);
119119

src/object_store.rs

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
use std::sync::Arc;
2-
3-
use object_store::{path::Path, ObjectStore, ObjectStoreScheme};
4-
use url::Url;
5-
61
use crate::{
72
arrow_parquet::uri_utils::uri_as_string,
83
object_store::{
@@ -15,42 +10,4 @@ use crate::{
1510
pub(crate) mod aws;
1611
pub(crate) mod azure;
1712
pub(crate) mod local_file;
18-
19-
pub(crate) fn create_object_store(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
20-
let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| {
21-
panic!(
22-
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
23-
uri
24-
)
25-
});
26-
27-
// object_store crate can recognize a bunch of different schemes and paths, but we only support
28-
// local, azure, and s3 schemes with a subset of all supported paths.
29-
match scheme {
30-
ObjectStoreScheme::AmazonS3 => {
31-
let storage_container = Arc::new(create_s3_object_store(uri));
32-
33-
(storage_container, path)
34-
}
35-
ObjectStoreScheme::MicrosoftAzure => {
36-
let storage_container = Arc::new(create_azure_object_store(uri));
37-
38-
(storage_container, path)
39-
}
40-
ObjectStoreScheme::Local => {
41-
let storage_container = Arc::new(create_local_file_object_store(uri, copy_from));
42-
43-
let path =
44-
Path::from_filesystem_path(uri_as_string(uri)).unwrap_or_else(|e| panic!("{}", e));
45-
46-
(storage_container, path)
47-
}
48-
_ => {
49-
panic!(
50-
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
51-
uri.scheme(),
52-
uri
53-
);
54-
}
55-
}
56-
}
13+
pub(crate) mod object_store_cache;

src/object_store/aws.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use std::{sync::Arc, time::SystemTime};
2+
13
use aws_config::BehaviorVersion;
24
use aws_credential_types::provider::ProvideCredentials;
3-
use object_store::aws::{AmazonS3, AmazonS3Builder};
5+
use object_store::aws::AmazonS3Builder;
46
use url::Url;
57

6-
use super::PG_BACKEND_TOKIO_RUNTIME;
8+
use super::{object_store_cache::ObjectStoreWithExpiration, PG_BACKEND_TOKIO_RUNTIME};
79

810
// create_s3_object_store creates an AmazonS3 object store with the given bucket name.
911
// It is configured by environment variables and aws config files as fallback method.
@@ -19,7 +21,7 @@ use super::PG_BACKEND_TOKIO_RUNTIME;
1921
// - AWS_CONFIG_FILE (env var only)
2022
// - AWS_PROFILE (env var only)
2123
// - AWS_ALLOW_HTTP (env var only, object_store specific)
22-
pub(crate) fn create_s3_object_store(uri: &Url) -> AmazonS3 {
24+
pub(crate) fn create_s3_object_store(uri: &Url) -> ObjectStoreWithExpiration {
2325
let bucket_name = parse_s3_bucket(uri).unwrap_or_else(|| {
2426
panic!("unsupported s3 uri: {}", uri);
2527
});
@@ -58,10 +60,17 @@ pub(crate) fn create_s3_object_store(uri: &Url) -> AmazonS3 {
5860
aws_s3_builder = aws_s3_builder.with_region(region);
5961
}
6062

61-
aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e))
63+
let object_store = aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e));
64+
65+
let expire_at = aws_s3_config.expire_at;
66+
67+
ObjectStoreWithExpiration {
68+
object_store: Arc::new(object_store),
69+
expire_at,
70+
}
6271
}
6372

64-
fn parse_s3_bucket(uri: &Url) -> Option<String> {
73+
pub(crate) fn parse_s3_bucket(uri: &Url) -> Option<String> {
6574
let host = uri.host_str()?;
6675

6776
// s3(a)://{bucket}/key
@@ -98,6 +107,7 @@ struct AwsS3Config {
98107
access_key_id: Option<String>,
99108
secret_access_key: Option<String>,
100109
session_token: Option<String>,
110+
expire_at: Option<SystemTime>,
101111
endpoint_url: Option<String>,
102112
allow_http: bool,
103113
}
@@ -121,6 +131,7 @@ impl AwsS3Config {
121131
let mut access_key_id = None;
122132
let mut secret_access_key = None;
123133
let mut session_token = None;
134+
let mut expire_at = None;
124135

125136
if let Some(credential_provider) = sdk_config.credentials_provider() {
126137
if let Ok(credentials) = PG_BACKEND_TOKIO_RUNTIME
@@ -129,6 +140,7 @@ impl AwsS3Config {
129140
access_key_id = Some(credentials.access_key_id().to_string());
130141
secret_access_key = Some(credentials.secret_access_key().to_string());
131142
session_token = credentials.session_token().map(|t| t.to_string());
143+
expire_at = credentials.expiry();
132144
}
133145
}
134146

@@ -141,6 +153,7 @@ impl AwsS3Config {
141153
access_key_id,
142154
secret_access_key,
143155
session_token,
156+
expire_at,
144157
endpoint_url,
145158
allow_http,
146159
}

src/object_store/azure.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
use std::sync::Arc;
2+
13
use azure_storage::{ConnectionString, EndpointProtocol};
24
use home::home_dir;
35
use ini::Ini;
4-
use object_store::azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder};
6+
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
57
use url::Url;
68

9+
use super::object_store_cache::ObjectStoreWithExpiration;
10+
711
// create_azure_object_store creates a MicrosoftAzure object store with the given container name.
812
// It is configured by environment variables and azure config files as fallback method.
913
// We need to read the config files to make the fallback method work since object_store
@@ -16,7 +20,7 @@ use url::Url;
1620
// - AZURE_CONFIG_FILE (env var only, object_store specific)
1721
// - AZURE_STORAGE_ENDPOINT (env var only, object_store specific)
1822
// - AZURE_ALLOW_HTTP (env var only, object_store specific)
19-
pub(crate) fn create_azure_object_store(uri: &Url) -> MicrosoftAzure {
23+
pub(crate) fn create_azure_object_store(uri: &Url) -> ObjectStoreWithExpiration {
2024
let container_name = parse_azure_blob_container(uri).unwrap_or_else(|| {
2125
panic!("unsupported azure blob storage uri: {}", uri);
2226
});
@@ -63,10 +67,18 @@ pub(crate) fn create_azure_object_store(uri: &Url) -> MicrosoftAzure {
6367
azure_builder = azure_builder.with_client_secret(client_secret);
6468
}
6569

66-
azure_builder.build().unwrap_or_else(|e| panic!("{}", e))
70+
let object_store = azure_builder.build().unwrap_or_else(|e| panic!("{}", e));
71+
72+
// object store handles refreshing bearer token, so we do not need to handle expiry here
73+
let expire_at = None;
74+
75+
ObjectStoreWithExpiration {
76+
object_store: Arc::new(object_store),
77+
expire_at,
78+
}
6779
}
6880

69-
fn parse_azure_blob_container(uri: &Url) -> Option<String> {
81+
pub(crate) fn parse_azure_blob_container(uri: &Url) -> Option<String> {
7082
let host = uri.host_str()?;
7183

7284
// az(ure)://{container}/key

src/object_store/local_file.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
use std::sync::Arc;
2+
13
use object_store::local::LocalFileSystem;
24
use url::Url;
35

4-
use super::uri_as_string;
6+
use super::{object_store_cache::ObjectStoreWithExpiration, uri_as_string};
57

68
// create_local_file_object_store creates a LocalFileSystem object store with the given path.
7-
pub(crate) fn create_local_file_object_store(uri: &Url, copy_from: bool) -> LocalFileSystem {
9+
pub(crate) fn create_local_file_object_store(
10+
uri: &Url,
11+
copy_from: bool,
12+
) -> ObjectStoreWithExpiration {
813
let path = uri_as_string(uri);
914

1015
if !copy_from {
@@ -17,5 +22,11 @@ pub(crate) fn create_local_file_object_store(uri: &Url, copy_from: bool) -> Loca
1722
.unwrap_or_else(|e| panic!("{}", e));
1823
}
1924

20-
LocalFileSystem::new()
25+
let object_store = LocalFileSystem::new();
26+
let expire_at = None;
27+
28+
ObjectStoreWithExpiration {
29+
object_store: Arc::new(object_store),
30+
expire_at,
31+
}
2132
}

0 commit comments

Comments
 (0)