Skip to content

Commit c52970f

Browse files
authoredSep 27, 2024··
[SVLS-5265] S3 Event Handler Span Pointers (#513)
Adding Span Pointers for the upstream S3 objects when a lambda is triggered by an S3 ObjectCreated event.
·
v7.112.0v6.99.0
1 parent d97d9bb commit c52970f

File tree

8 files changed

+403
-216
lines changed

8 files changed

+403
-216
lines changed
 

‎datadog_lambda/span_pointers.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from itertools import chain
2+
import logging
3+
from typing import List
4+
5+
from ddtrace._trace.utils_botocore.span_pointers import (
6+
_aws_s3_object_span_pointer_description,
7+
)
8+
from ddtrace._trace._span_pointer import _SpanPointerDirection
9+
from ddtrace._trace._span_pointer import _SpanPointerDescription
10+
from datadog_lambda.trigger import EventTypes
11+
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def calculate_span_pointers(
17+
event_source,
18+
event,
19+
) -> List[_SpanPointerDescription]:
20+
try:
21+
if event_source.equals(EventTypes.S3):
22+
return _calculate_s3_span_pointers_for_event(event)
23+
24+
except Exception as e:
25+
logger.warning(
26+
"failed to calculate span pointers for event: %s",
27+
str(e),
28+
)
29+
30+
return []
31+
32+
33+
def _calculate_s3_span_pointers_for_event(event) -> List[_SpanPointerDescription]:
34+
# Example event:
35+
# https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
36+
37+
return list(
38+
chain.from_iterable(
39+
_calculate_s3_span_pointers_for_event_record(record)
40+
for record in event.get("Records", [])
41+
)
42+
)
43+
44+
45+
def _calculate_s3_span_pointers_for_event_record(
46+
record,
47+
) -> List[_SpanPointerDescription]:
48+
# Event types:
49+
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
50+
51+
if record.get("eventName").startswith("ObjectCreated:"):
52+
s3_information = record.get("s3", None)
53+
if s3_information is not None:
54+
return _calculate_s3_span_pointers_for_object_created_s3_information(
55+
s3_information
56+
)
57+
58+
return []
59+
60+
61+
def _calculate_s3_span_pointers_for_object_created_s3_information(
62+
s3_information,
63+
) -> List[_SpanPointerDescription]:
64+
try:
65+
bucket = s3_information["bucket"]["name"]
66+
key = s3_information["object"]["key"]
67+
etag = s3_information["object"]["eTag"]
68+
69+
except KeyError as e:
70+
logger.warning(
71+
"missing s3 information required to make a span pointer: %s",
72+
str(e),
73+
)
74+
return []
75+
76+
try:
77+
return [
78+
_aws_s3_object_span_pointer_description(
79+
pointer_direction=_SpanPointerDirection.UPSTREAM,
80+
bucket=bucket,
81+
key=key,
82+
etag=etag,
83+
)
84+
]
85+
86+
except Exception as e:
87+
logger.warning(
88+
"failed to generate S3 span pointer: %s",
89+
str(e),
90+
)
91+
return []

‎datadog_lambda/tracing.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,7 @@ def create_function_execution_span(
12591259
merge_xray_traces,
12601260
trigger_tags,
12611261
parent_span=None,
1262+
span_pointers=None,
12621263
):
12631264
tags = None
12641265
if context:
@@ -1296,6 +1297,14 @@ def create_function_execution_span(
12961297
span.set_tags(tags)
12971298
if parent_span:
12981299
span.parent_id = parent_span.span_id
1300+
if span_pointers:
1301+
for span_pointer_description in span_pointers:
1302+
span._add_span_pointer(
1303+
pointer_kind=span_pointer_description.pointer_kind,
1304+
pointer_direction=span_pointer_description.pointer_direction,
1305+
pointer_hash=span_pointer_description.pointer_hash,
1306+
extra_attributes=span_pointer_description.extra_attributes,
1307+
)
12991308
return span
13001309

13011310

‎datadog_lambda/wrapper.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
from datadog_lambda.module_name import modify_module_name
3232
from datadog_lambda.patch import patch_all
33+
from datadog_lambda.span_pointers import calculate_span_pointers
3334
from datadog_lambda.tracing import (
3435
extract_dd_trace_context,
3536
create_dd_dummy_metadata_subsegment,
@@ -307,14 +308,15 @@ def _before(self, event, context):
307308
event, context, event_source, self.decode_authorizer_context
308309
)
309310
self.span = create_function_execution_span(
310-
context,
311-
self.function_name,
312-
is_cold_start(),
313-
is_proactive_init(),
314-
trace_context_source,
315-
self.merge_xray_traces,
316-
self.trigger_tags,
311+
context=context,
312+
function_name=self.function_name,
313+
is_cold_start=is_cold_start(),
314+
is_proactive_init=is_proactive_init(),
315+
trace_context_source=trace_context_source,
316+
merge_xray_traces=self.merge_xray_traces,
317+
trigger_tags=self.trigger_tags,
317318
parent_span=self.inferred_span,
319+
span_pointers=calculate_span_pointers(event_source, event),
318320
)
319321
else:
320322
set_correlation_ids()

‎poetry.lock

Lines changed: 122 additions & 196 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ classifiers = [
2727
python = ">=3.8.0,<4"
2828
datadog = ">=0.41.0,<1.0.0"
2929
wrapt = "^1.11.2"
30-
ddtrace = ">=2.10.0"
30+
ddtrace = ">=2.14.0"
3131
ujson = ">=5.9.0"
3232
boto3 = { version = "^1.34.0", optional = true }
3333
requests = { version ="^2.22.0", optional = true }

‎scripts/run_integration_tests.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
# Usage - run commands from repo root:
44
# To check if new changes to the layer cause changes to any snapshots:
5-
# BUILD_LAYERS=true DD_API_KEY=XXXX aws-vault exec sso-serverless-sandbox-account-admin -- ./scripts/run_integration_tests
5+
# BUILD_LAYERS=true DD_API_KEY=XXXX aws-vault exec sso-serverless-sandbox-account-admin -- ./scripts/run_integration_tests.sh
66
# To regenerate snapshots:
7-
# UPDATE_SNAPSHOTS=true DD_API_KEY=XXXX aws-vault exec sso-serverless-sandbox-account-admin -- ./scripts/run_integration_tests
7+
# UPDATE_SNAPSHOTS=true DD_API_KEY=XXXX aws-vault exec sso-serverless-sandbox-account-admin -- ./scripts/run_integration_tests.sh
88

99
set -e
1010

‎tests/test_span_pointers.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
from typing import List
2+
from typing import NamedTuple
3+
4+
from ddtrace._trace._span_pointer import _SpanPointerDirection
5+
from ddtrace._trace._span_pointer import _SpanPointerDescription
6+
from datadog_lambda.trigger import _EventSource
7+
from datadog_lambda.trigger import EventTypes
8+
from datadog_lambda.span_pointers import calculate_span_pointers
9+
import pytest
10+
11+
12+
class TestCalculateSpanPointers:
13+
class SpanPointersCase(NamedTuple):
14+
name: str
15+
event_source: _EventSource
16+
event: dict
17+
span_pointers: List[_SpanPointerDescription]
18+
19+
@pytest.mark.parametrize(
20+
"test_case",
21+
[
22+
SpanPointersCase(
23+
name="some unsupported event",
24+
event_source=_EventSource(EventTypes.UNKNOWN),
25+
event={},
26+
span_pointers=[],
27+
),
28+
SpanPointersCase(
29+
name="empty s3 event",
30+
event_source=_EventSource(EventTypes.S3),
31+
event={},
32+
span_pointers=[],
33+
),
34+
SpanPointersCase(
35+
name="sensible s3 event",
36+
event_source=_EventSource(EventTypes.S3),
37+
event={
38+
"Records": [
39+
{
40+
"eventName": "ObjectCreated:Put",
41+
"s3": {
42+
"bucket": {
43+
"name": "mybucket",
44+
},
45+
"object": {
46+
"key": "mykey",
47+
"eTag": "123abc",
48+
},
49+
},
50+
},
51+
],
52+
},
53+
span_pointers=[
54+
_SpanPointerDescription(
55+
pointer_kind="aws.s3.object",
56+
pointer_direction=_SpanPointerDirection.UPSTREAM,
57+
pointer_hash="8d49f5b0b742484159d4cd572bae1ce5",
58+
extra_attributes={},
59+
),
60+
],
61+
),
62+
SpanPointersCase(
63+
name="malformed s3 event",
64+
event_source=_EventSource(EventTypes.S3),
65+
event={
66+
"Records": [
67+
{
68+
"eventName": "ObjectCreated:Put",
69+
"s3": {
70+
"bucket": {
71+
"name": "mybucket",
72+
},
73+
"object": {
74+
"key": "mykey",
75+
# missing eTag
76+
},
77+
},
78+
},
79+
],
80+
},
81+
span_pointers=[],
82+
),
83+
],
84+
ids=lambda test_case: test_case.name,
85+
)
86+
def test_calculate_span_pointers(self, test_case: SpanPointersCase):
87+
assert (
88+
calculate_span_pointers(
89+
test_case.event_source,
90+
test_case.event,
91+
)
92+
== test_case.span_pointers
93+
)

‎tests/test_tracing.py

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
from ddtrace import tracer
1414
from ddtrace.context import Context
15+
from ddtrace._trace._span_pointer import _SpanPointer
16+
from ddtrace._trace._span_pointer import _SpanPointerDirection
17+
from ddtrace._trace._span_pointer import _SpanPointerDescription
1518

1619
from datadog_lambda.constants import (
1720
SamplingPriority,
@@ -746,20 +749,34 @@ class TestFunctionSpanTags(unittest.TestCase):
746749
def test_function(self):
747750
ctx = get_mock_context()
748751
span = create_function_execution_span(
749-
ctx, "", False, False, {"source": ""}, False, {}
752+
context=ctx,
753+
function_name="",
754+
is_cold_start=False,
755+
is_proactive_init=False,
756+
trace_context_source={"source": ""},
757+
merge_xray_traces=False,
758+
trigger_tags={},
759+
span_pointers=None,
750760
)
751761
self.assertEqual(span.get_tag("function_arn"), function_arn)
752762
self.assertEqual(span.get_tag("function_version"), "$LATEST")
753763
self.assertEqual(span.get_tag("resource_names"), "Function")
754764
self.assertEqual(span.get_tag("functionname"), "function")
765+
self.assertEqual(span._links, [])
755766

756767
def test_function_with_version(self):
757768
function_version = "1"
758769
ctx = get_mock_context(
759770
invoked_function_arn=function_arn + ":" + function_version
760771
)
761772
span = create_function_execution_span(
762-
ctx, "", False, False, {"source": ""}, False, {}
773+
context=ctx,
774+
function_name="",
775+
is_cold_start=False,
776+
is_proactive_init=False,
777+
trace_context_source={"source": ""},
778+
merge_xray_traces=False,
779+
trigger_tags={},
763780
)
764781
self.assertEqual(span.get_tag("function_arn"), function_arn)
765782
self.assertEqual(span.get_tag("function_version"), function_version)
@@ -770,7 +787,13 @@ def test_function_with_alias(self):
770787
function_alias = "alias"
771788
ctx = get_mock_context(invoked_function_arn=function_arn + ":" + function_alias)
772789
span = create_function_execution_span(
773-
ctx, "", False, False, {"source": ""}, False, {}
790+
context=ctx,
791+
function_name="",
792+
is_cold_start=False,
793+
is_proactive_init=False,
794+
trace_context_source={"source": ""},
795+
merge_xray_traces=False,
796+
trigger_tags={},
774797
)
775798
self.assertEqual(span.get_tag("function_arn"), function_arn)
776799
self.assertEqual(span.get_tag("function_version"), function_alias)
@@ -780,13 +803,13 @@ def test_function_with_alias(self):
780803
def test_function_with_trigger_tags(self):
781804
ctx = get_mock_context()
782805
span = create_function_execution_span(
783-
ctx,
784-
"",
785-
False,
786-
False,
787-
{"source": ""},
788-
False,
789-
{"function_trigger.event_source": "cloudwatch-logs"},
806+
context=ctx,
807+
function_name="",
808+
is_cold_start=False,
809+
is_proactive_init=False,
810+
trace_context_source={"source": ""},
811+
merge_xray_traces=False,
812+
trigger_tags={"function_trigger.event_source": "cloudwatch-logs"},
790813
)
791814
self.assertEqual(span.get_tag("function_arn"), function_arn)
792815
self.assertEqual(span.get_tag("resource_names"), "Function")
@@ -795,6 +818,49 @@ def test_function_with_trigger_tags(self):
795818
span.get_tag("function_trigger.event_source"), "cloudwatch-logs"
796819
)
797820

821+
def test_function_with_span_pointers(self):
822+
ctx = get_mock_context()
823+
span = create_function_execution_span(
824+
context=ctx,
825+
function_name="",
826+
is_cold_start=False,
827+
is_proactive_init=False,
828+
trace_context_source={"source": ""},
829+
merge_xray_traces=False,
830+
trigger_tags={},
831+
span_pointers=[
832+
_SpanPointerDescription(
833+
pointer_kind="some.kind",
834+
pointer_direction=_SpanPointerDirection.UPSTREAM,
835+
pointer_hash="some.hash",
836+
extra_attributes={},
837+
),
838+
_SpanPointerDescription(
839+
pointer_kind="other.kind",
840+
pointer_direction=_SpanPointerDirection.DOWNSTREAM,
841+
pointer_hash="other.hash",
842+
extra_attributes={"extra": "stuff"},
843+
),
844+
],
845+
)
846+
self.assertEqual(
847+
span._links,
848+
[
849+
_SpanPointer(
850+
pointer_kind="some.kind",
851+
pointer_direction=_SpanPointerDirection.UPSTREAM,
852+
pointer_hash="some.hash",
853+
extra_attributes={},
854+
),
855+
_SpanPointer(
856+
pointer_kind="other.kind",
857+
pointer_direction=_SpanPointerDirection.DOWNSTREAM,
858+
pointer_hash="other.hash",
859+
extra_attributes={"extra": "stuff"},
860+
),
861+
],
862+
)
863+
798864

799865
class TestSetTraceRootSpan(unittest.TestCase):
800866
def setUp(self):

0 commit comments

Comments
 (0)
Please sign in to comment.