File tree Expand file tree Collapse file tree 1 file changed +16
-7
lines changed Expand file tree Collapse file tree 1 file changed +16
-7
lines changed Original file line number Diff line number Diff line change @@ -35,16 +35,25 @@ def carrier_get(key):
35
35
36
36
37
37
def _dsm_set_sns_context (event ):
38
- from ddtrace .internal .datastreams .botocore import calculate_sns_payload_size
39
-
40
- def sns_payload_calculator (record , context_json ):
41
- return calculate_sns_payload_size (record , context_json )
38
+ from ddtrace .data_streams import set_consume_checkpoint
42
39
43
- def sns_arn_extractor (record ):
40
+ records = event .get ("Records" )
41
+ if records is None :
42
+ return
43
+ for record in records :
44
44
sns_data = record .get ("Sns" )
45
45
if not sns_data :
46
- return ""
47
- return sns_data .get ("TopicArn" , "" )
46
+ return
47
+ arn = sns_data .get ("TopicArn" , "" )
48
+ context_json = _get_dsm_context_from_lambda (record )
49
+ if not context_json :
50
+ logger .debug ("DataStreams skipped lambda message: %r" , record )
51
+ return None
52
+
53
+ def carrier_get (key ):
54
+ return context_json .get (key )
55
+
56
+ set_consume_checkpoint ("sns" , arn , carrier_get )
48
57
49
58
50
59
def _get_dsm_context_from_lambda (message ):
You can’t perform that action at this time.
0 commit comments