Skip to content

refactor: remove tracer dependencies to support dsm sqs -> lambda #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: main
Choose a base branch
from

Conversation

michael-zhao459
Copy link
Collaborator

@michael-zhao459 michael-zhao459 commented Jun 9, 2025

What does this PR do?

Removes dependencies on internal tracer code. The get_dsm_context() is moved inside the lambda layer, and the public facing api for setting checkpoints is used instead of internal dsm code in the tracer.

Motivation

Helps decouple the lambda layer code from the tracer code, keeps with bests practice of not using internal implementation code.

Testing Guidelines

Function was properly unit tested

Additional Notes

IMPORTANT: This PR cannot get merged until the tracer releases a version that includes this PR DataDog/dd-trace-py#13646 where the manual_checkpoint parameter is added to the set_consume_checkpoint() code

Types of Changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)

Copy link
Collaborator Author

michael-zhao459 commented Jun 9, 2025

@michael-zhao459 michael-zhao459 marked this pull request as ready for review June 9, 2025 18:51
@michael-zhao459 michael-zhao459 requested review from a team as code owners June 9, 2025 18:51
@michael-zhao459 michael-zhao459 changed the title move get dsm context logic into lambda layer code feat: move get dsm context logic into lambda layer code Jun 9, 2025
@michael-zhao459 michael-zhao459 marked this pull request as draft June 11, 2025 13:57
@michael-zhao459 michael-zhao459 changed the title feat: move get dsm context logic into lambda layer code refactor: remove tracer dependencies to support dsm lambda -> sqs Jun 11, 2025
@michael-zhao459 michael-zhao459 changed the title refactor: remove tracer dependencies to support dsm lambda -> sqs refactor: remove tracer dependencies to support dsm sqs -> lambda Jun 11, 2025
)
except Exception as e:
logger.error(format_err_with_traceback(e))
arn = record.get("eventSourceARN", "")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we remove the try / except here. Is there a reason for that (maybe there is and I don't see it).
But we want to make sure our instrumentation never prevents the lambda from being executed, even if there is an issue with the instrumentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should move this inside the try/except.

},
}

result = _get_dsm_context_from_lambda(message)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] newline here

@robcarlan-datadog
Copy link

Wait for tracer version 3.9.2 before we can merge

@michael-zhao459 michael-zhao459 marked this pull request as ready for review June 16, 2025 20:09
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're logging debug messages multiple times for the same record.


datadog_attr = message_attributes["_datadog"]

if "stringValue" in datadog_attr:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a type check here to ensure this is a dict.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to clarify but we are checking that context_json is a dict right? I think context_json is the only one we need to make sure is a dict, the test you asked me to write also signaled that to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also make sure datadog_attr is a dict.

datadog_attr = message_attributes["_datadog"]

if "stringValue" in datadog_attr:
# SQS -> lambda
Copy link
Contributor

@purple4reina purple4reina Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the event_type to avoid doing unnecessary work. We should already mostly know the shape of the event. Without doing so, this method is gonna get insanely large.

I would recommend creating a separate _get_dsm_context for each event type.

},
{
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3",
"body": "Message 3",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add another test where the stringValue isn't a dict.

context_json = _get_dsm_context_from_lambda(record)
if not context_json:
logger.debug("DataStreams skipped lambda message: %r", record)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is untested.

carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
except Exception as e:
logger.error(f"Unable to set dsm context: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is untested. We should test to make sure that if there's an exception in setting the checkpoint that this method properly captures the error.

payload_size = calculate_sqs_payload_size(record)
context_json = _get_dsm_context_from_sqs_lambda(record)
if not context_json:
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you continue instead of return?

logger.debug("DataStreams did not handle lambda message: %r", message)
return None
else:
logger.debug("DataStreams did not handle lambda message: %r", message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend making each of these log lines slightly different. That way when one is encountered, it is easy to find the exact line of code where it was produced. Otherwise, we don't know what the actual issue was.

return None
else:
logger.debug(
"DataStreams did not handle lambda message: %r, no dsm context", message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the %r at the end, the message itself could be quite long.



def _set_dsm_context_for_record(context_json, type, arn):
from ddtrace.data_streams import set_consume_checkpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be sure to set a minimum version for the ddtrace dependency. To do that, you'll want to find the first version of ddtrace that includes this set_consume_checkpoint. Then update pyproject.toml with this version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks for letting me know how this is done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants