Skip to content

feat: add sns->sqs->lambda support #618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 120 additions & 20 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,138 @@
from datadog_lambda import logger
import logging
import json
import base64

from datadog_lambda.trigger import EventTypes

logger = logging.getLogger(__name__)

def set_dsm_context(event, event_source):

def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)
elif event_source.equals(EventTypes.SNS):
_dsm_set_sns_context(event)
elif event_source.equals(EventTypes.KINESIS):
_dsm_set_kinesis_context(event)


def _dsm_set_sqs_context(event):
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)
records = event.get("Records")
if records is None:
return

for record in records:
arn = record.get("eventSourceARN", "")
_set_dsm_context_for_record(record, "sqs", arn)


def _dsm_set_sns_context(event):
records = event.get("Records")
if records is None:
return
processor = data_streams_processor()

for record in records:
try:
queue_arn = record.get("eventSourceARN", "")
sns_data = record.get("Sns")
if not sns_data:
return
arn = sns_data.get("TopicArn", "")
_set_dsm_context_for_record(sns_data, "sns", arn)


def _dsm_set_kinesis_context(event):
records = event.get("Records")
if records is None:
return

for record in records:
arn = record.get("eventSourceARN", "")
_set_dsm_context_for_record(record, "kinesis", arn)

contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)

ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
payload_size=payload_size,
def _set_dsm_context_for_record(record, type, arn):
from ddtrace.data_streams import set_consume_checkpoint

try:
context_json = _get_dsm_context_from_lambda(record)
if not context_json:
logger.debug("DataStreams skipped lambda message: %r", record)
return

carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get)
except Exception as e:
logger.error(f"Unable to set dsm context: {e}")


def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
"""
context_json = None
message_body = message

if "kinesis" in message:
try:
kinesis_data = json.loads(
base64.b64decode(message["kinesis"]["data"]).decode()
)
return kinesis_data.get("_datadog")
except (ValueError, TypeError, KeyError):
logger.debug("Unable to parse kinesis data for lambda message")
return None
elif "Sns" in message:
message_body = message["Sns"]
else:
try:
body = message.get("body")
if body:
parsed_body = json.loads(body)
if "MessageAttributes" in parsed_body:
message_body = parsed_body
except (ValueError, TypeError):
logger.debug(
"Unable to parse lambda message body as JSON, treat as non-json"
)
except Exception as e:
logger.error(format_err_with_traceback(e))

message_attributes = message_body.get("MessageAttributes") or message_body.get(
"messageAttributes"
)

if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None

if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if message_body.get("Type") == "Notification":
# SNS -> lambda notification
if datadog_attr.get("Type") == "Binary":
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
elif "stringValue" in datadog_attr:
# SQS -> lambda
context_json = json.loads(datadog_attr["stringValue"])
elif "binaryValue" in datadog_attr:
# SNS -> SQS -> lambda, raw message delivery
context_json = json.loads(
base64.b64decode(datadog_attr["binaryValue"]).decode()
)
else:
logger.debug("DataStreams did not handle lambda message: %r", message)

return context_json


def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)

return carrier_get
Binary file added layers/datadog_lambda_py-amd64-3.13.zip
Binary file not shown.
Binary file added layers/datadog_lambda_py-arm64-3.13.zip
Binary file not shown.
Loading