Skip to content

refactor: remove tracer dependencies to support dsm sqs -> lambda #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 69 additions & 20 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,87 @@
from datadog_lambda import logger
import logging
import json
from datadog_lambda.trigger import EventTypes

logger = logging.getLogger(__name__)

def set_dsm_context(event, event_source):

def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)


def _dsm_set_sqs_context(event):
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)

records = event.get("Records")
if records is None:
return
processor = data_streams_processor()

for record in records:
try:
queue_arn = record.get("eventSourceARN", "")
arn = record.get("eventSourceARN", "")
context_json = _get_dsm_context_from_sqs_lambda(record)
if not context_json:
continue
_set_dsm_context_for_record(context_json, "sqs", arn)

except Exception as e:
logger.error(f"Unable to set dsm context: {e}")


def _set_dsm_context_for_record(context_json, type, arn):
from ddtrace.data_streams import set_consume_checkpoint

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be sure to set a minimum version for the ddtrace dependency. To do that, you'll want to find the first version of ddtrace that includes this set_consume_checkpoint. Then update pyproject.toml with this version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks for letting me know how this is done

carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)

contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)

ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
payload_size=payload_size,
def _get_dsm_context_from_sqs_lambda(message):
"""
Lambda-specific message shape for SQS -> Lambda:
- message.messageAttributes._datadog.stringValue
"""
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
logger.debug(
"DataStreams skipped lambda message, no messageAttributes, message: %r",
message,
)
return None

if "_datadog" not in message_attributes:
logger.debug(
"DataStreams skipped lambda message, no datadog context, message: %r",
message,
)
return None

datadog_attr = message_attributes["_datadog"]
if not isinstance(datadog_attr, dict):
logger.debug(
"DataStreams did not handle lambda message, datadog context is not a dict, message: %r",
message,
)
return None

if "stringValue" in datadog_attr:
context_json = json.loads(datadog_attr["stringValue"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a type check here to ensure this is a dict.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to clarify but we are checking that context_json is a dict right? I think context_json is the only one we need to make sure is a dict, the test you asked me to write also signaled that to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also make sure datadog_attr is a dict.

if not isinstance(context_json, dict):
logger.debug(
"DataStreams did not handle lambda message, dsm context is not a dict, message: %r",
message,
)
except Exception as e:
logger.error(format_err_with_traceback(e))
return None
else:
logger.debug(
"DataStreams did not handle lambda message, no dsm context, message: %r",
message,
)

return context_json


def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)

return carrier_get
Loading
Loading