@@ -24,6 +24,19 @@ def _dsm_set_sqs_context(event):
24
24
_set_dsm_context_for_record (record , "sqs" , arn )
25
25
26
26
27
+ def _dsm_set_sns_context (event ):
28
+ records = event .get ("Records" )
29
+ if records is None :
30
+ return
31
+
32
+ for record in records :
33
+ sns_data = record .get ("Sns" )
34
+ if not sns_data :
35
+ return
36
+ arn = sns_data .get ("TopicArn" , "" )
37
+ _set_dsm_context_for_record (sns_data , "sns" , arn )
38
+
39
+
27
40
def _set_dsm_context_for_record (record , type , arn ):
28
41
from ddtrace .data_streams import set_consume_checkpoint
29
42
@@ -39,30 +52,6 @@ def _set_dsm_context_for_record(record, type, arn):
39
52
logger .error (f"Unable to set dsm context: { e } " )
40
53
41
54
42
- def _dsm_set_sns_context (event ):
43
- from ddtrace .data_streams import set_consume_checkpoint
44
-
45
- records = event .get ("Records" )
46
- if records is None :
47
- return
48
-
49
- for record in records :
50
- try :
51
- sns_data = record .get ("Sns" )
52
- if not sns_data :
53
- return
54
- arn = sns_data .get ("TopicArn" , "" )
55
- context_json = _get_dsm_context_from_lambda (sns_data )
56
- if not context_json :
57
- logger .debug ("DataStreams skipped lambda message: %r" , record )
58
- return
59
-
60
- carrier_get = _create_carrier_get (context_json )
61
- set_consume_checkpoint ("sns" , arn , carrier_get )
62
- except Exception as e :
63
- logger .error (f"Unable to set dsm context: { e } " )
64
-
65
-
66
55
def _get_dsm_context_from_lambda (message ):
67
56
"""
68
57
Lambda-specific message formats:
0 commit comments