1- use std:: {
2- panic,
3- sync:: { Arc , LazyLock } ,
4- } ;
1+ use std:: { panic, sync:: Arc } ;
52
63use arrow:: datatypes:: SchemaRef ;
7- use aws_config:: BehaviorVersion ;
8- use aws_credential_types:: provider:: ProvideCredentials ;
9- use home:: home_dir;
10- use ini:: Ini ;
11- use object_store:: {
12- aws:: { AmazonS3 , AmazonS3Builder } ,
13- azure:: { AzureConfigKey , MicrosoftAzure , MicrosoftAzureBuilder } ,
14- local:: LocalFileSystem ,
15- path:: Path ,
16- ObjectStore , ObjectStoreScheme ,
17- } ;
184use parquet:: {
195 arrow:: {
206 arrow_to_parquet_schema,
@@ -29,229 +15,16 @@ use pgrx::{
2915 ereport,
3016 pg_sys:: { get_role_oid, has_privs_of_role, superuser, AsPgCStr , GetUserId } ,
3117} ;
32- use tokio:: runtime:: Runtime ;
3318use url:: Url ;
3419
35- use crate :: arrow_parquet:: parquet_writer:: DEFAULT_ROW_GROUP_SIZE ;
20+ use crate :: {
21+ arrow_parquet:: parquet_writer:: DEFAULT_ROW_GROUP_SIZE , object_store:: create_object_store,
22+ PG_BACKEND_TOKIO_RUNTIME ,
23+ } ;
3624
3725const PARQUET_OBJECT_STORE_READ_ROLE : & str = "parquet_object_store_read" ;
3826const PARQUET_OBJECT_STORE_WRITE_ROLE : & str = "parquet_object_store_write" ;
3927
40- // PG_BACKEND_TOKIO_RUNTIME creates a tokio runtime that uses the current thread
41- // to run the tokio reactor. This uses the same thread that is running the Postgres backend.
42- pub ( crate ) static PG_BACKEND_TOKIO_RUNTIME : LazyLock < Runtime > = LazyLock :: new ( || {
43- tokio:: runtime:: Builder :: new_current_thread ( )
44- . enable_all ( )
45- . build ( )
46- . unwrap_or_else ( |e| panic ! ( "failed to create tokio runtime: {}" , e) )
47- } ) ;
48-
49- fn parse_azure_blob_container ( uri : & Url ) -> Option < String > {
50- let host = uri. host_str ( ) ?;
51-
52- // az(ure)://{container}/key
53- if uri. scheme ( ) == "az" || uri. scheme ( ) == "azure" {
54- return Some ( host. to_string ( ) ) ;
55- }
56- // https://{account}.blob.core.windows.net/{container}
57- else if host. ends_with ( ".blob.core.windows.net" ) {
58- let path_segments: Vec < & str > = uri. path_segments ( ) ?. collect ( ) ;
59-
60- // Container name is the first part of the path
61- return Some (
62- path_segments
63- . first ( )
64- . expect ( "unexpected error during parsing azure blob uri" )
65- . to_string ( ) ,
66- ) ;
67- }
68-
69- None
70- }
71-
72- fn parse_s3_bucket ( uri : & Url ) -> Option < String > {
73- let host = uri. host_str ( ) ?;
74-
75- // s3(a)://{bucket}/key
76- if uri. scheme ( ) == "s3" || uri. scheme ( ) == "s3a" {
77- return Some ( host. to_string ( ) ) ;
78- }
79- // https://s3.amazonaws.com/{bucket}/key
80- else if host == "s3.amazonaws.com" {
81- let path_segments: Vec < & str > = uri. path_segments ( ) ?. collect ( ) ;
82-
83- // Bucket name is the first part of the path
84- return Some (
85- path_segments
86- . first ( )
87- . expect ( "unexpected error during parsing s3 uri" )
88- . to_string ( ) ,
89- ) ;
90- }
91- // https://{bucket}.s3.amazonaws.com/key
92- else if host. ends_with ( ".s3.amazonaws.com" ) {
93- let bucket_name = host. split ( '.' ) . next ( ) ?;
94- return Some ( bucket_name. to_string ( ) ) ;
95- }
96-
97- None
98- }
99-
100- fn object_store_with_location ( uri : & Url , copy_from : bool ) -> ( Arc < dyn ObjectStore > , Path ) {
101- let ( scheme, path) =
102- ObjectStoreScheme :: parse ( uri) . unwrap_or_else ( |_| panic ! ( "unrecognized uri {}" , uri) ) ;
103-
104- // object_store crate can recognize a bunch of different schemes and paths, but we only support
105- // local, azure, and s3 schemes with a subset of all supported paths.
106- match scheme {
107- ObjectStoreScheme :: AmazonS3 => {
108- let bucket_name = parse_s3_bucket ( uri) . unwrap_or_else ( || {
109- panic ! ( "unsupported s3 uri: {}" , uri) ;
110- } ) ;
111-
112- let storage_container = PG_BACKEND_TOKIO_RUNTIME
113- . block_on ( async { Arc :: new ( get_s3_object_store ( & bucket_name) . await ) } ) ;
114-
115- ( storage_container, path)
116- }
117- ObjectStoreScheme :: MicrosoftAzure => {
118- let container_name = parse_azure_blob_container ( uri) . unwrap_or_else ( || {
119- panic ! ( "unsupported azure blob storage uri: {}" , uri) ;
120- } ) ;
121-
122- let storage_container = PG_BACKEND_TOKIO_RUNTIME
123- . block_on ( async { Arc :: new ( get_azure_object_store ( & container_name) . await ) } ) ;
124-
125- ( storage_container, path)
126- }
127- ObjectStoreScheme :: Local => {
128- let uri = uri_as_string ( uri) ;
129-
130- if !copy_from {
131- // create or overwrite the local file
132- std:: fs:: OpenOptions :: new ( )
133- . write ( true )
134- . truncate ( true )
135- . create ( true )
136- . open ( & uri)
137- . unwrap_or_else ( |e| panic ! ( "{}" , e) ) ;
138- }
139-
140- let storage_container = Arc :: new ( LocalFileSystem :: new ( ) ) ;
141-
142- let path = Path :: from_filesystem_path ( & uri) . unwrap_or_else ( |e| panic ! ( "{}" , e) ) ;
143-
144- ( storage_container, path)
145- }
146- _ => {
147- panic ! ( "unsupported scheme {} in uri {}" , uri. scheme( ) , uri) ;
148- }
149- }
150- }
151-
152- // get_s3_object_store creates an AmazonS3 object store with the given bucket name.
153- // It is configured by environment variables and aws config files as fallback method.
154- // We need to read the config files to make the fallback method work since object_store
155- // does not provide a way to read them. Currently, we only support to extract
156- // "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", "AWS_ENDPOINT_URL",
157- // and "AWS_REGION" from the config files.
158- async fn get_s3_object_store ( bucket_name : & str ) -> AmazonS3 {
159- let mut aws_s3_builder = AmazonS3Builder :: from_env ( ) . with_bucket_name ( bucket_name) ;
160-
161- // first tries environment variables and then the config files
162- let sdk_config = aws_config:: defaults ( BehaviorVersion :: v2024_03_28 ( ) )
163- . load ( )
164- . await ;
165-
166- if let Some ( credential_provider) = sdk_config. credentials_provider ( ) {
167- if let Ok ( credentials) = credential_provider. provide_credentials ( ) . await {
168- // AWS_ACCESS_KEY_ID
169- aws_s3_builder = aws_s3_builder. with_access_key_id ( credentials. access_key_id ( ) ) ;
170-
171- // AWS_SECRET_ACCESS_KEY
172- aws_s3_builder = aws_s3_builder. with_secret_access_key ( credentials. secret_access_key ( ) ) ;
173-
174- if let Some ( token) = credentials. session_token ( ) {
175- // AWS_SESSION_TOKEN
176- aws_s3_builder = aws_s3_builder. with_token ( token) ;
177- }
178- }
179- }
180-
181- // AWS_ENDPOINT_URL
182- if let Some ( aws_endpoint_url) = sdk_config. endpoint_url ( ) {
183- aws_s3_builder = aws_s3_builder. with_endpoint ( aws_endpoint_url) ;
184- }
185-
186- // AWS_REGION
187- if let Some ( aws_region) = sdk_config. region ( ) {
188- aws_s3_builder = aws_s3_builder. with_region ( aws_region. as_ref ( ) ) ;
189- }
190-
191- aws_s3_builder. build ( ) . unwrap_or_else ( |e| panic ! ( "{}" , e) )
192- }
193-
194- async fn get_azure_object_store ( container_name : & str ) -> MicrosoftAzure {
195- let mut azure_builder = MicrosoftAzureBuilder :: from_env ( ) . with_container_name ( container_name) ;
196-
197- // ~/.azure/config
198- let azure_config_file_path = std:: env:: var ( "AZURE_CONFIG_FILE" ) . unwrap_or (
199- home_dir ( )
200- . expect ( "failed to get home directory" )
201- . join ( ".azure" )
202- . join ( "config" )
203- . to_str ( )
204- . expect ( "failed to convert path to string" )
205- . to_string ( ) ,
206- ) ;
207-
208- let azure_config_content = Ini :: load_from_file ( & azure_config_file_path) . ok ( ) ;
209-
210- // storage account
211- let azure_blob_account = match std:: env:: var ( "AZURE_STORAGE_ACCOUNT" ) {
212- Ok ( account) => Some ( account) ,
213- Err ( _) => azure_config_content
214- . as_ref ( )
215- . and_then ( |ini| ini. section ( Some ( "storage" ) ) )
216- . and_then ( |section| section. get ( "account" ) )
217- . map ( |account| account. to_string ( ) ) ,
218- } ;
219-
220- if let Some ( azure_blob_account) = azure_blob_account {
221- azure_builder = azure_builder. with_account ( azure_blob_account) ;
222- }
223-
224- // storage key
225- let azure_blob_key = match std:: env:: var ( "AZURE_STORAGE_KEY" ) {
226- Ok ( key) => Some ( key) ,
227- Err ( _) => azure_config_content
228- . as_ref ( )
229- . and_then ( |ini| ini. section ( Some ( "storage" ) ) )
230- . and_then ( |section| section. get ( "key" ) )
231- . map ( |key| key. to_string ( ) ) ,
232- } ;
233-
234- if let Some ( azure_blob_key) = azure_blob_key {
235- azure_builder = azure_builder. with_access_key ( azure_blob_key) ;
236- }
237-
238- // sas token
239- let azure_blob_sas_token = match std:: env:: var ( "AZURE_STORAGE_SAS_TOKEN" ) {
240- Ok ( token) => Some ( token) ,
241- Err ( _) => azure_config_content
242- . as_ref ( )
243- . and_then ( |ini| ini. section ( Some ( "storage" ) ) )
244- . and_then ( |section| section. get ( "sas_token" ) )
245- . map ( |token| token. to_string ( ) ) ,
246- } ;
247-
248- if let Some ( azure_blob_sas_token) = azure_blob_sas_token {
249- azure_builder = azure_builder. with_config ( AzureConfigKey :: SasKey , azure_blob_sas_token) ;
250- }
251-
252- azure_builder. build ( ) . unwrap_or_else ( |e| panic ! ( "{}" , e) )
253- }
254-
25528pub ( crate ) fn parse_uri ( uri : & str ) -> Url {
25629 if !uri. contains ( "://" ) {
25730 // local file
@@ -285,7 +58,7 @@ pub(crate) fn parquet_schema_from_uri(uri: &Url) -> SchemaDescriptor {
28558
28659pub ( crate ) fn parquet_metadata_from_uri ( uri : & Url ) -> Arc < ParquetMetaData > {
28760 let copy_from = true ;
288- let ( parquet_object_store, location) = object_store_with_location ( uri, copy_from) ;
61+ let ( parquet_object_store, location) = create_object_store ( uri, copy_from) ;
28962
29063 PG_BACKEND_TOKIO_RUNTIME . block_on ( async {
29164 let object_store_meta = parquet_object_store
@@ -308,7 +81,7 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
30881
30982pub ( crate ) fn parquet_reader_from_uri ( uri : & Url ) -> ParquetRecordBatchStream < ParquetObjectReader > {
31083 let copy_from = true ;
311- let ( parquet_object_store, location) = object_store_with_location ( uri, copy_from) ;
84+ let ( parquet_object_store, location) = create_object_store ( uri, copy_from) ;
31285
31386 PG_BACKEND_TOKIO_RUNTIME . block_on ( async {
31487 let object_store_meta = parquet_object_store
@@ -340,7 +113,7 @@ pub(crate) fn parquet_writer_from_uri(
340113 writer_props : WriterProperties ,
341114) -> AsyncArrowWriter < ParquetObjectWriter > {
342115 let copy_from = false ;
343- let ( parquet_object_store, location) = object_store_with_location ( uri, copy_from) ;
116+ let ( parquet_object_store, location) = create_object_store ( uri, copy_from) ;
344117
345118 let parquet_object_writer = ParquetObjectWriter :: new ( parquet_object_store, location) ;
346119
0 commit comments