Skip to content

Commit 82f4c04

Browse files
feat(S3): Conditional s3 upload and VersionID
Only upload to S3 when file checksum is different. Requires simultaneous changes in SCOAP3. Signed-off-by: [email protected]
1 parent 54d5c7d commit 82f4c04

File tree

6 files changed

+35907
-7
lines changed

6 files changed

+35907
-7
lines changed

dags/common/scoap3_s3.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import hashlib
12
import os
23
from uuid import uuid4
34

45
import requests
6+
from botocore.exceptions import ClientError
57
from common.repository import IRepository
68
from common.s3_service import S3Service
79
from structlog import get_logger
@@ -21,6 +23,11 @@ def update_filename_extension(filename, type):
2123
return f"{filename}{extension}"
2224

2325

26+
def get_file_checksum(data):
27+
"""Calculate MD5 checksum of file data"""
28+
return hashlib.md5(data).hexdigest()
29+
30+
2431
class Scoap3Repository(IRepository):
2532
def __init__(self):
2633
super().__init__()
@@ -30,6 +37,31 @@ def __init__(self):
3037
self.s3 = S3Service(self.bucket)
3138
self.client = self.s3.meta.client
3239

40+
def file_exists_with_same_checksum(self, bucket, key, data=None):
41+
"""Check if a file exists at the destination and has the same checksum"""
42+
try:
43+
if data:
44+
# Calculate checksum of data
45+
data_checksum = get_file_checksum(data)
46+
47+
# Get destination file if it exists
48+
try:
49+
dest_response = self.client.head_object(Bucket=bucket, Key=key)
50+
dest_checksum = dest_response.get("ETag", "").strip('"')
51+
52+
# Compare checksums
53+
return data_checksum == dest_checksum
54+
except ClientError as e:
55+
if e.response["Error"]["Code"] == "404":
56+
# File doesn't exist at destination
57+
return False
58+
raise
59+
60+
return False
61+
except Exception as e:
62+
logger.error("Error checking file existence", error=str(e))
63+
return False
64+
3365
def copy_file(self, source_bucket, source_key, prefix=None, type=None):
3466
if not self.upload_enabled:
3567
return ""
@@ -113,7 +145,7 @@ def download_files_for_aps(self, files, prefix=None):
113145

114146
def download_and_upload_to_s3(self, url, prefix=None, headers=None, type=None):
115147
if not self.upload_enabled:
116-
return ""
148+
return {"path": "", "version_id": ""}
117149

118150
if not prefix:
119151
prefix = str(uuid4())
@@ -127,11 +159,37 @@ def download_and_upload_to_s3(self, url, prefix=None, headers=None, type=None):
127159
response.raise_for_status()
128160
except requests.exceptions.HTTPError as e:
129161
logger.error("Failed to download file", error=str(e), url=url)
130-
return
162+
return {"path": "", "version_id": ""}
163+
164+
if self.file_exists_with_same_checksum(
165+
self.bucket, destination_key, data=response.content
166+
):
167+
logger.info(
168+
"File already exists with the same checksum, skipping upload",
169+
url=url,
170+
destination=f"{self.bucket}/{destination_key}",
171+
)
172+
try:
173+
# Get the VersionId using head_object
174+
head_response = self.client.head_object(
175+
Bucket=self.bucket, Key=destination_key
176+
)
177+
version_id = head_response.get("VersionId", "")
178+
return {
179+
"path": f"{self.bucket}/{destination_key}",
180+
"version_id": version_id,
181+
}
182+
except Exception as e:
183+
logger.error(
184+
"Failed to get version ID for existing file",
185+
error=str(e),
186+
bucket=self.bucket,
187+
key=destination_key,
188+
)
189+
return {"path": f"{self.bucket}/{destination_key}", "version_id": ""}
131190

132191
try:
133-
# Upload the file to S3
134-
self.client.put_object(
192+
put_response = self.client.put_object(
135193
Body=response.content,
136194
Bucket=self.bucket,
137195
Key=destination_key,
@@ -140,11 +198,17 @@ def download_and_upload_to_s3(self, url, prefix=None, headers=None, type=None):
140198
},
141199
ACL="public-read",
142200
)
143-
return f"{self.bucket}/{destination_key}"
201+
# Extract VersionId from the response
202+
version_id = put_response.get("VersionId", "")
203+
return {
204+
"path": f"{self.bucket}/{destination_key}",
205+
"version_id": version_id,
206+
}
144207
except Exception as e:
145208
logger.error(
146209
"Failed to upload file",
147210
error=str(e),
148211
bucket=self.bucket,
149212
key=destination_key,
150213
)
214+
return {"path": "", "version_id": ""}

docker-compose.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,6 @@ services:
254254
}
255255
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
256256
airflow_version_comparable=$$(ver $${airflow_version})
257-
min_airflow_version=2.2.0
258-
min_airflow_version_comparable=$$(ver $${min_airflow_version})
259257
if (( airflow_version_comparable < min_airflow_version_comparable )); then
260258
echo
261259
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"

0 commit comments

Comments
 (0)