Skip to content

Commit e993592

Browse files
committed
Polymorphic (de)serialization support (#7104, PR #7127)
2 parents a853d3d + e44490c commit e993592

27 files changed

+1347
-962
lines changed

.mypy.ini

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ modules =
5555
azul.service.app_controller,
5656
azul.indexer.mirror_service,
5757
azul.digests,
58-
lambdas.indexer.app
58+
lambdas.indexer.app,
59+
azul.schemas,
60+
azul.indexer.index_controller,
61+
azul.indexer.index_queue_service,
62+
azul.indexer.index_repository_service
5963
packages =
6064
azul.openapi
6165

lambdas/indexer/app.py

Lines changed: 3 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import logging
32
from typing import (
43
Optional,
@@ -7,14 +6,9 @@
76
import chalice
87

98
from azul import (
10-
CatalogName,
119
JSON,
1210
cached_property,
1311
config,
14-
mutable_furl,
15-
)
16-
from azul.azulclient import (
17-
IndexAction,
1812
)
1913
from azul.chalice import (
2014
LambdaMetric,
@@ -43,15 +37,6 @@
4337
)
4438
from azul.openapi import (
4539
format_description as fd,
46-
params,
47-
responses,
48-
schema,
49-
)
50-
from azul.openapi.responses import (
51-
json_content,
52-
)
53-
from azul.queues import (
54-
Queues,
5540
)
5641
from azul.types import (
5742
not_none,
@@ -84,14 +69,7 @@ def index_controller(self) -> IndexController:
8469

8570
@cached_property
8671
def mirror_controller(self) -> MirrorController:
87-
88-
def schema_url(*, schema_name: str, version: int) -> mutable_furl:
89-
path = self.schema_url_path.format(facility='mirror',
90-
schema_name=schema_name,
91-
version_and_extension=f'v{version}.json')
92-
return self.base_url.set(path=path)
93-
94-
return MirrorController(app=self, schema_url_func=schema_url)
72+
return MirrorController(app=self)
9573

9674
@cached_property
9775
def log_controller(self) -> LogForwardingController:
@@ -124,13 +102,6 @@ def decorator(f):
124102
else:
125103
return lambda func: func
126104

127-
schema_url_path = '/schemas/{facility}/{schema_name}/{version_and_extension}'
128-
129-
def schema_resource(self, *path: str) -> JSON:
130-
schema = json.loads(app.load_static_resource('schemas', *path))
131-
schema['$id'] = str(self.self_url)
132-
return schema
133-
134105
def _authenticate(self) -> Optional[HMACAuthentication]:
135106
return self.auth_from_request(not_none(self.current_request))
136107

@@ -140,153 +111,7 @@ def _authenticate(self) -> Optional[HMACAuthentication]:
140111

141112
globals().update(app.default_routes())
142113

143-
144-
@app.route(
145-
'/{catalog}/{action}',
146-
methods=['POST'],
147-
spec={
148-
'tags': ['Indexing'],
149-
'summary': 'Notify the indexer to perform an action on a bundle',
150-
'description': fd('''
151-
Queue a bundle for addition to or deletion from the index.
152-
153-
The request must be authenticated using HMAC via the ``signature``
154-
header. Each Azul deployment has its own unique HMAC key. The HMAC
155-
components are the request method, request path, and the SHA256
156-
digest of the request body.
157-
158-
A valid HMAC header proves that the client is in possession of the
159-
secret HMAC key and that the request wasn't tampered with while
160-
travelling between client and service, even though the latter is not
161-
strictly necessary considering that TLS is used to encrypt the
162-
entire exchange. Internal clients can obtain the secret key from the
163-
environment they are running in, and that they share with the
164-
service. External clients must have been given the secret key. The
165-
now-defunct DSS was such an external client. The Azul indexer
166-
provided the HMAC secret to DSS when it registered with DSS to be
167-
notified about bundle additions/deletions. These days only internal
168-
clients use this endpoint.
169-
'''),
170-
'requestBody': {
171-
'description': 'Contents of the notification',
172-
'required': True,
173-
**json_content(schema.object(
174-
bundle_fqid=schema.object(
175-
uuid=str,
176-
version=str,
177-
source=schema.object(
178-
id=str,
179-
spec=str
180-
)
181-
)
182-
))
183-
},
184-
'parameters': [
185-
params.path('catalog',
186-
schema.enum(*config.catalogs),
187-
description='The name of the catalog to notify.'),
188-
params.path('action',
189-
schema.enum(IndexAction.add.name, IndexAction.delete.name),
190-
description='Which action to perform.'),
191-
params.header('signature',
192-
str,
193-
description='HMAC authentication signature.')
194-
],
195-
'responses': {
196-
'200': {
197-
'description': 'Notification was successfully queued for processing'
198-
},
199-
'400': {
200-
'description': 'Request was rejected due to malformed parameters'
201-
},
202-
'401': {
203-
'description': 'Request lacked a valid HMAC header'
204-
}
205-
}
206-
}
207-
)
208-
def post_notification(catalog: CatalogName, action: str):
209-
"""
210-
Receive a notification event and queue it for indexing or deletion.
211-
"""
212-
return app.index_controller.handle_notification(catalog, action)
213-
214-
215-
@app.metric_alarm(metric=LambdaMetric.errors,
216-
threshold=int(config.contribution_concurrency(retry=False) * 2 / 3),
217-
period=5 * 60)
218-
@app.metric_alarm(metric=LambdaMetric.throttles,
219-
threshold=int(96000 / config.contribution_concurrency(retry=False)),
220-
period=5 * 60)
221-
@app.on_sqs_message(
222-
queue=config.notifications_queue.name,
223-
batch_size=1
224-
)
225-
def contribute(event: chalice.app.SQSEvent):
226-
app.index_controller.contribute(event)
227-
228-
229-
@app.metric_alarm(metric=LambdaMetric.errors,
230-
threshold=int(config.aggregation_concurrency(retry=False) * 3),
231-
period=5 * 60)
232-
@app.metric_alarm(metric=LambdaMetric.throttles,
233-
threshold=int(37760 / config.aggregation_concurrency(retry=False)),
234-
period=5 * 60)
235-
@app.on_sqs_message(
236-
queue=config.tallies_queue.name,
237-
batch_size=Queues.batch_size
238-
)
239-
def aggregate(event: chalice.app.SQSEvent):
240-
app.index_controller.aggregate(event)
241-
242-
243-
# Any messages in the tallies queue that fail being processed will be retried
244-
# with more RAM in the tallies_retry queue.
245-
246-
@app.metric_alarm(metric=LambdaMetric.errors,
247-
threshold=int(config.aggregation_concurrency(retry=True) * 1 / 16),
248-
period=5 * 60)
249-
@app.metric_alarm(metric=LambdaMetric.throttles,
250-
threshold=0,
251-
period=5 * 60)
252-
@app.on_sqs_message(
253-
queue=config.tallies_queue.to_retry.name,
254-
batch_size=Queues.batch_size
255-
)
256-
def aggregate_retry(event: chalice.app.SQSEvent):
257-
app.index_controller.aggregate(event, retry=True)
258-
259-
260-
# Any messages in the notifications queue that fail being processed will be
261-
# retried with more RAM and a longer timeout in the notifications_retry queue.
262-
263-
@app.metric_alarm(metric=LambdaMetric.errors,
264-
threshold=int(config.contribution_concurrency(retry=True) * 1 / 4),
265-
period=5 * 60)
266-
@app.metric_alarm(metric=LambdaMetric.throttles,
267-
threshold=int(31760 / config.contribution_concurrency(retry=True)),
268-
period=5 * 60)
269-
@app.on_sqs_message(
270-
queue=config.notifications_queue.to_retry.name,
271-
batch_size=1
272-
)
273-
def contribute_retry(event: chalice.app.SQSEvent):
274-
app.index_controller.contribute(event, retry=True)
275-
276-
277-
if config.enable_mirroring:
278-
@app.metric_alarm(metric=LambdaMetric.errors,
279-
threshold=int(config.mirroring_concurrency * 2 / 3),
280-
period=5 * 60)
281-
@app.metric_alarm(metric=LambdaMetric.throttles,
282-
threshold=int(96000 / config.mirroring_concurrency),
283-
period=5 * 60)
284-
@app.on_sqs_message(
285-
queue=config.mirror_queue.name,
286-
batch_size=1
287-
)
288-
def mirror(event: chalice.app.SQSEvent):
289-
app.mirror_controller.mirror(event)
114+
globals().update(app.index_controller.handlers())
290115

291116

292117
@app.log_forwarder(
@@ -303,40 +128,4 @@ def forward_s3_logs(event: chalice.app.S3Event):
303128
app.log_controller.forward_s3_access_logs(event)
304129

305130

306-
json_schema_docs = 'https://json-schema.org/docs'
307-
308-
309-
@app.route(
310-
app.schema_url_path,
311-
methods=['GET'],
312-
cors=True,
313-
spec={
314-
'summary': 'Retrieve JSON schemas',
315-
'tags': ['Auxiliary'],
316-
'parameters': [
317-
params.path('facility', str),
318-
params.path('schema_name', str),
319-
params.path('version_and_extension', schema.pattern(r'v\d+\.json')),
320-
],
321-
'description': fd(
322-
f'''
323-
[JSON Schemas]({json_schema_docs}) for various Azul facilities.
324-
'''
325-
),
326-
'responses': {
327-
'200': {
328-
'description': 'Contents of the schema',
329-
**responses.json_content(
330-
schema.object(
331-
schema=str,
332-
id=str,
333-
type=str,
334-
additionalProperties=True
335-
)
336-
)
337-
}
338-
}
339-
}
340-
)
341-
def get_schema(facility, schema_name, version_and_extension):
342-
return app.schema_resource(facility, schema_name, version_and_extension)
131+
globals().update(app.mirror_controller.handlers())

scripts/mirror_file.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,23 @@ def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str:
6666
file = get_file(catalog, file_uuid)
6767
# FIXME: mirror_file script is broken+
6868
# https://github.com/DataBiosphere/azul/issues/7105
69-
service = MirrorService(schema_url_func=...)
70-
upload_id = service.begin_mirroring_file(catalog, file)
69+
service = MirrorService(catalog=catalog, schema_url_func=...)
70+
upload_id = service.begin_mirroring_file(file)
7171
digest_value, digest_type = file.digest()
7272
hasher = get_resumable_hasher(digest_type)
7373

7474
def mirror_parts():
7575
part = FilePart.first(file, part_size)
7676
while part is not None:
77-
yield service.mirror_file_part(catalog, file, part, upload_id, hasher)
77+
yield service.mirror_file_part(file, part, upload_id, hasher)
7878
part = part.next(file)
7979

8080
etags = list(mirror_parts())
81-
service.finish_mirroring_file(catalog=catalog,
82-
file=file,
81+
service.finish_mirroring_file(file=file,
8382
upload_id=upload_id,
8483
etags=etags,
8584
hasher=hasher)
86-
return service.get_mirror_url(catalog, file)
85+
return service.get_mirror_url(file)
8786

8887

8988
def main(argv):

scripts/reindex.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def main(argv: list[str]):
175175
if args.local:
176176
num_notifications += azul.local_reindex(catalog, args.prefix)
177177
else:
178-
azul.remote_reindex(catalog, sources)
178+
azul.index_queue_service.remote_reindex(catalog, sources)
179179
num_notifications = None
180180
else:
181181
log.info('Skipping catalog %r (no matching sources)', catalog)

0 commit comments

Comments
 (0)