Skip to content

Commit c1ba6ad

Browse files
committed
Adds Support for COPY TO/FROM Google Cloud Storage
Supports following Google Cloud Storage uri forms: - gs:// \<bucket\> / \<path\> **Configuration** The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]: ```bash $ cat /tmp/gcs.json { "gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key_id": "", "private_key": "" } ``` Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client: - `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key - `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file
1 parent 2a3061f commit c1ba6ad

File tree

10 files changed

+145
-7
lines changed

10 files changed

+145
-7
lines changed

.devcontainer/.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
1414
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
1515
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"
1616

17+
# GCS tests
18+
GOOGLE_TEST_BUCKET=testbucket
19+
1720
# Others
1821
RUST_TEST_THREADS=1
1922
PG_PARQUET_TEST=true

.devcontainer/create-test-buckets.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
aws --endpoint-url http://localhost:9000 s3 mb s3://$AWS_S3_TEST_BUCKET
44

55
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
6+
7+
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"

.devcontainer/docker-compose.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ services:
2121
depends_on:
2222
- minio
2323
- azurite
24+
- fake-gcs-server
2425

2526
minio:
2627
image: minio/minio
@@ -46,3 +47,16 @@ services:
4647
interval: 6s
4748
timeout: 2s
4849
retries: 3
50+
51+
fake-gcs-server:
52+
image: tustvold/fake-gcs-server
53+
env_file:
54+
- .env
55+
network_mode: host
56+
command: -scheme http -public-host localhost:4443
57+
restart: unless-stopped
58+
healthcheck:
59+
test: ["CMD", "curl", "http://localhost:4443"]
60+
interval: 6s
61+
timeout: 2s
62+
retries: 3

.github/workflows/ci.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,17 @@ jobs:
132132
133133
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
134134
135+
- name: Start fake-gcs-server for Google Cloud Storage emulator tests
136+
run: |
137+
docker run -d --env-file .devcontainer/.env -p 4443:4443 tustvold/fake-gcs-server -scheme http -filesystem-root /tmp/gcs -public-host localhost:4443
138+
139+
while ! nc -z localhost 4443; do
140+
echo "Waiting for localhost:4443..."
141+
sleep 1
142+
done
143+
144+
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
145+
135146
- name: Run tests
136147
run: |
137148
# Run tests with coverage tool

.vscode/settings.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55
"rust-analyzer.checkOnSave": true,
66
"editor.inlayHints.enabled": "offUnlessPressed",
77
"files.watcherExclude": {
8-
"**/target/**": true
9-
}
8+
"**/target/**": true
9+
}
1010
}

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ aws-config = { version = "1.5", default-features = false, features = ["rustls"]}
2626
aws-credential-types = {version = "1.2", default-features = false}
2727
futures = "0.3"
2828
home = "0.5"
29-
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
29+
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]}
3030
once_cell = "1"
3131
parquet = {version = "53", default-features = false, features = [
3232
"arrow",

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,28 @@ Supported Azure Blob Storage uri formats are shown below:
212212
- azure:// \<container\> / \<path\>
213213
- https:// \<account\>.blob.core.windows.net / \<container\> / \<path\>
214214

215+
#### Google Cloud Storage
216+
217+
The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:
218+
219+
```bash
220+
$ cat /tmp/gcs.json
221+
{
222+
"gcs_base_url": "http://localhost:4443",
223+
"disable_oauth": true,
224+
"client_email": "",
225+
"private_key_id": "",
226+
"private_key": ""
227+
}
228+
```
229+
230+
Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
231+
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key
232+
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file
233+
234+
Supported Google Cloud Storage uri formats are shown below:
235+
- gs:// \<bucket\> / \<path\>
236+
215237
## Copy Options
216238
`pg_parquet` supports the following options in the `COPY TO` command:
217239
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension. (This is the only option that `COPY FROM` command supports.),

src/arrow_parquet/uri_utils.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use ini::Ini;
1515
use object_store::{
1616
aws::{AmazonS3, AmazonS3Builder},
1717
azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder},
18+
gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
1819
local::LocalFileSystem,
1920
path::Path,
2021
ObjectStore, ObjectStoreScheme,
@@ -96,6 +97,17 @@ fn parse_s3_bucket(uri: &Url) -> Option<String> {
9697
None
9798
}
9899

100+
fn parse_gcs_bucket(uri: &Url) -> Option<String> {
101+
let host = uri.host_str()?;
102+
103+
// gs://{bucket}/key
104+
if uri.scheme() == "gs" {
105+
return Some(host.to_string());
106+
}
107+
108+
None
109+
}
110+
99111
fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
100112
let (scheme, path) =
101113
ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri));
@@ -121,6 +133,16 @@ fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStor
121133

122134
(storage_container, path)
123135
}
136+
ObjectStoreScheme::GoogleCloudStorage => {
137+
let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| {
138+
panic!("failed to parse bucket name from uri: {}", uri);
139+
});
140+
141+
let storage_container = PG_BACKEND_TOKIO_RUNTIME
142+
.block_on(async { Arc::new(get_gcs_object_store(&bucket_name).await) });
143+
144+
(storage_container, path)
145+
}
124146
ObjectStoreScheme::Local => {
125147
let uri = uri_as_string(uri);
126148

@@ -262,6 +284,25 @@ async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure {
262284
azure_builder.build().unwrap_or_else(|e| panic!("{}", e))
263285
}
264286

287+
async fn get_gcs_object_store(bucket_name: &str) -> GoogleCloudStorage {
288+
let mut gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
289+
290+
if is_testing() {
291+
// use fake-gcp-server for testing
292+
gcs_builder = gcs_builder.with_service_account_key(
293+
"{
294+
\"gcs_base_url\": \"http://localhost:4443\",
295+
\"disable_oauth\": true,
296+
\"client_email\": \"\",
297+
\"private_key_id\": \"\",
298+
\"private_key\": \"\"
299+
}",
300+
);
301+
}
302+
303+
gcs_builder.build().unwrap_or_else(|e| panic!("{}", e))
304+
}
305+
265306
fn is_testing() -> bool {
266307
std::env::var("PG_PARQUET_TEST").is_ok()
267308
}
@@ -284,13 +325,20 @@ pub(crate) fn parse_uri(uri: &str) -> Url {
284325
} else if scheme == ObjectStoreScheme::MicrosoftAzure {
285326
parse_azure_blob_container(&uri).unwrap_or_else(|| {
286327
panic!(
287-
"failed to parse container name from azure blob storage uri {}",
328+
"failed to parse container name from Azure Blob Storage uri {}",
329+
uri
330+
)
331+
});
332+
} else if scheme == ObjectStoreScheme::GoogleCloudStorage {
333+
parse_gcs_bucket(&uri).unwrap_or_else(|| {
334+
panic!(
335+
"failed to parse bucket name from Google Cloud Storage uri {}",
288336
uri
289337
)
290338
});
291339
} else {
292340
panic!(
293-
"unsupported uri {}. Only Azure and S3 uris are supported.",
341+
"unsupported uri {}. Only Azure Blob Storage, S3 and Google Cloud Storage uris are supported.",
294342
uri
295343
);
296344
};

src/lib.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1932,10 +1932,47 @@ mod tests {
19321932
}
19331933

19341934
#[pg_test]
1935-
#[should_panic(expected = "unsupported uri gs://testbucket")]
1935+
fn test_gcs_from_env() {
1936+
let test_bucket_name: String =
1937+
std::env::var("GOOGLE_TEST_BUCKET").expect("GOOGLE_TEST_BUCKET not found");
1938+
1939+
let gcs_uri = format!("gs://{}/pg_parquet_test.parquet", test_bucket_name);
1940+
1941+
let test_table = TestTable::<i32>::new("int4".into()).with_uri(gcs_uri);
1942+
1943+
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
1944+
test_helper(test_table);
1945+
}
1946+
1947+
#[pg_test]
1948+
#[should_panic(expected = "404 Not Found")]
1949+
fn test_gcs_write_wrong_bucket() {
1950+
let s3_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";
1951+
1952+
let copy_to_command = format!(
1953+
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}';",
1954+
s3_uri
1955+
);
1956+
Spi::run(copy_to_command.as_str()).unwrap();
1957+
}
1958+
1959+
#[pg_test]
1960+
#[should_panic(expected = "404 Not Found")]
1961+
fn test_gcs_read_wrong_bucket() {
1962+
let gcs_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";
1963+
1964+
let create_table_command = "CREATE TABLE test_table (a int);";
1965+
Spi::run(create_table_command).unwrap();
1966+
1967+
let copy_from_command = format!("COPY test_table FROM '{}';", gcs_uri);
1968+
Spi::run(copy_from_command.as_str()).unwrap();
1969+
}
1970+
1971+
#[pg_test]
1972+
#[should_panic(expected = "unsupported uri http://testbucket")]
19361973
fn test_unsupported_uri() {
19371974
let test_table =
1938-
TestTable::<i32>::new("int4".into()).with_uri("gs://testbucket".to_string());
1975+
TestTable::<i32>::new("int4".into()).with_uri("http://testbucket".to_string());
19391976
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
19401977
test_helper(test_table);
19411978
}

0 commit comments

Comments
 (0)