Skip to content

feat: add kinesis -> lambda support #614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 103 additions & 20 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,121 @@
from datadog_lambda import logger
import logging
import json
import base64

from datadog_lambda.trigger import EventTypes

logger = logging.getLogger(__name__)

def set_dsm_context(event, event_source):

def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)
elif event_source.equals(EventTypes.SNS):
_dsm_set_sns_context(event)
elif event_source.equals(EventTypes.KINESIS):
_dsm_set_kinesis_context(event)


def _dsm_set_sqs_context(event):
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)
records = event.get("Records")
if records is None:
return

for record in records:
arn = record.get("eventSourceARN", "")
_set_dsm_context_for_record(record, "sqs", arn)


def _dsm_set_sns_context(event):
records = event.get("Records")
if records is None:
return
processor = data_streams_processor()

for record in records:
try:
queue_arn = record.get("eventSourceARN", "")
sns_data = record.get("Sns")
if not sns_data:
return
arn = sns_data.get("TopicArn", "")
_set_dsm_context_for_record(sns_data, "sns", arn)


def _dsm_set_kinesis_context(event):
records = event.get("Records")
if records is None:
return

for record in records:
arn = record.get("eventSourceARN", "")
_set_dsm_context_for_record(record, "kinesis", arn)

contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)

ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
payload_size=payload_size,
def _set_dsm_context_for_record(record, type, arn):
from ddtrace.data_streams import set_consume_checkpoint

try:
context_json = _get_dsm_context_from_lambda(record)
if not context_json:
logger.debug("DataStreams skipped lambda message: %r", record)
return

carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
except Exception as e:
logger.error(f"Unable to set dsm context: {e}")


def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
"""
context_json = None
message_body = message

if "kinesis" in message:
try:
kinesis_data = json.loads(
base64.b64decode(message["kinesis"]["data"]).decode()
)
except Exception as e:
logger.error(format_err_with_traceback(e))
return kinesis_data.get("_datadog")
except (ValueError, TypeError, KeyError):
logger.debug("Unable to parse kinesis data for lambda message")
return None

if "Sns" in message:
message_body = message["Sns"]

message_attributes = message_body.get("MessageAttributes") or message_body.get(
"messageAttributes"
)

if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None

if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if message_body.get("Type") == "Notification":
# SNS -> lambda notification
if datadog_attr.get("Type") == "Binary":
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
elif "stringValue" in datadog_attr:
# SQS -> lambda
context_json = json.loads(datadog_attr["stringValue"])
else:
logger.debug("DataStreams did not handle lambda message: %r", message)

return context_json


def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)

return carrier_get
Binary file added layers/datadog_lambda_py-amd64-3.13.zip
Binary file not shown.
Binary file added layers/datadog_lambda_py-arm64-3.13.zip
Binary file not shown.
472 changes: 445 additions & 27 deletions tests/test_dsm.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,46 @@
import unittest
from unittest.mock import patch, MagicMock
import base64
import json
from unittest.mock import patch

from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context
from datadog_lambda.dsm import (
set_dsm_context,
_dsm_set_sqs_context,
_dsm_set_sns_context,
_dsm_set_kinesis_context,
_get_dsm_context_from_lambda,
)
from datadog_lambda.trigger import EventTypes, _EventSource


class TestDsmSQSContext(unittest.TestCase):
class TestSetDSMContext(unittest.TestCase):
def setUp(self):
patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context")
self.mock_dsm_set_sqs_context = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.data_streams_processor")
self.mock_data_streams_processor = patcher.start()
patcher = patch("ddtrace.data_streams.set_consume_checkpoint")
self.mock_set_consume_checkpoint = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context")
self.mock_get_datastreams_context = patcher.start()
self.mock_get_datastreams_context.return_value = {}
patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda")
self.mock_get_dsm_context_from_lambda = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch(
"ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size"
)
self.mock_calculate_sqs_payload_size = patcher.start()
self.mock_calculate_sqs_payload_size.return_value = 100
patcher = patch("datadog_lambda.dsm._dsm_set_sns_context")
self.mock_dsm_set_sns_context = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode")
self.mock_dsm_pathway_codec_decode = patcher.start()
patcher = patch("datadog_lambda.dsm._dsm_set_kinesis_context")
self.mock_dsm_set_kinesis_context = patcher.start()
self.addCleanup(patcher.stop)

def test_non_sqs_event_source_does_nothing(self):
"""Test that non-SQS event sources don't trigger DSM context setting"""
event = {}
# Use Unknown Event Source
event_source = _EventSource(EventTypes.UNKNOWN)
set_dsm_context(event, event_source)

# DSM context should not be set for non-SQS events
self.mock_dsm_set_sqs_context.assert_not_called()

def test_sqs_event_with_no_records_does_nothing(self):
@@ -51,7 +53,7 @@ def test_sqs_event_with_no_records_does_nothing(self):

for event in events_with_no_records:
_dsm_set_sqs_context(event)
self.mock_data_streams_processor.assert_not_called()
self.mock_set_consume_checkpoint.assert_not_called()

def test_sqs_event_triggers_dsm_sqs_context(self):
"""Test that SQS event sources trigger the SQS-specific DSM context function"""
@@ -77,36 +79,452 @@ def test_sqs_multiple_records_process_each_record(self):
{
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1",
"body": "Message 1",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
{"dd-pathway-ctx-base64": "context1"}
),
"dataType": "String",
}
},
},
{
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2",
"body": "Message 2",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
{"dd-pathway-ctx-base64": "context2"}
),
"dataType": "String",
}
},
},
{
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3",
"body": "Message 3",
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(
{"dd-pathway-ctx-base64": "context3"}
),
"dataType": "String",
}
},
},
]
}

mock_context = MagicMock()
self.mock_dsm_pathway_codec_decode.return_value = mock_context
self.mock_get_dsm_context_from_lambda.side_effect = [
{"dd-pathway-ctx-base64": "context1"},
{"dd-pathway-ctx-base64": "context2"},
{"dd-pathway-ctx-base64": "context3"},
]

_dsm_set_sqs_context(multi_record_event)

self.assertEqual(mock_context.set_checkpoint.call_count, 3)
self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3)

calls = mock_context.set_checkpoint.call_args_list
calls = self.mock_set_consume_checkpoint.call_args_list
expected_arns = [
"arn:aws:sqs:us-east-1:123456789012:queue1",
"arn:aws:sqs:us-east-1:123456789012:queue2",
"arn:aws:sqs:us-east-1:123456789012:queue3",
]
expected_contexts = ["context1", "context2", "context3"]

for i, call in enumerate(calls):
args, kwargs = call
service_type = args[0]
arn = args[1]
carrier_get_func = args[2]

self.assertEqual(service_type, "sqs")

self.assertEqual(arn, expected_arns[i])

pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
self.assertEqual(pathway_ctx, expected_contexts[i])

def test_sns_event_with_no_records_does_nothing(self):
"""Test that events where Records is None don't trigger DSM processing"""
events_with_no_records = [
{},
{"Records": None},
{"someOtherField": "value"},
]

for event in events_with_no_records:
_dsm_set_sns_context(event)
self.mock_set_consume_checkpoint.assert_not_called()

def test_sns_event_triggers_dsm_sns_context(self):
"""Test that SNS event sources trigger the SNS-specific DSM context function"""
sns_event = {
"Records": [
{
"EventSource": "aws:sns",
"Sns": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:my-topic",
"Message": "Hello from SNS!",
},
}
]
}

event_source = _EventSource(EventTypes.SNS)
set_dsm_context(sns_event, event_source)

self.mock_dsm_set_sns_context.assert_called_once_with(sns_event)

def test_sns_multiple_records_process_each_record(self):
"""Test that each record in an SNS event gets processed individually"""
multi_record_event = {
"Records": [
{
"EventSource": "aws:sns",
"Sns": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:topic1",
"Message": "Message 1",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": base64.b64encode(
json.dumps(
{"dd-pathway-ctx-base64": "context1"}
).encode("utf-8")
).decode("utf-8"),
}
},
},
},
{
"EventSource": "aws:sns",
"Sns": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:topic2",
"Message": "Message 2",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": base64.b64encode(
json.dumps(
{"dd-pathway-ctx-base64": "context2"}
).encode("utf-8")
).decode("utf-8"),
}
},
},
},
{
"EventSource": "aws:sns",
"Sns": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:topic3",
"Message": "Message 3",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": base64.b64encode(
json.dumps(
{"dd-pathway-ctx-base64": "context3"}
).encode("utf-8")
).decode("utf-8"),
}
},
},
},
]
}

self.mock_get_dsm_context_from_lambda.side_effect = [
{"dd-pathway-ctx-base64": "context1"},
{"dd-pathway-ctx-base64": "context2"},
{"dd-pathway-ctx-base64": "context3"},
]

_dsm_set_sns_context(multi_record_event)

self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3)

calls = self.mock_set_consume_checkpoint.call_args_list
expected_arns = [
"arn:aws:sns:us-east-1:123456789012:topic1",
"arn:aws:sns:us-east-1:123456789012:topic2",
"arn:aws:sns:us-east-1:123456789012:topic3",
]
expected_contexts = ["context1", "context2", "context3"]

for i, call in enumerate(calls):
args, kwargs = call
service_type = args[0]
arn = args[1]
carrier_get_func = args[2]

self.assertEqual(service_type, "sns")

self.assertEqual(arn, expected_arns[i])

pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
self.assertEqual(pathway_ctx, expected_contexts[i])

def test_kinesis_event_with_no_records_does_nothing(self):
"""Test that events where Records is None don't trigger DSM processing"""
events_with_no_records = [
{},
{"Records": None},
{"someOtherField": "value"},
]

for event in events_with_no_records:
_dsm_set_kinesis_context(event)
self.mock_set_consume_checkpoint.assert_not_called()

def test_kinesis_event_triggers_dsm_kinesis_context(self):
"""Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
kinesis_event = {
"Records": [
{
"eventSource": "aws:kinesis",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
"kinesis": {
"data": "SGVsbG8gZnJvbSBLaW5lc2lzIQ==",
"partitionKey": "partition-key",
},
}
]
}

event_source = _EventSource(EventTypes.KINESIS)
set_dsm_context(kinesis_event, event_source)

self.mock_dsm_set_kinesis_context.assert_called_once_with(kinesis_event)

def test_kinesis_multiple_records_process_each_record(self):
"""Test that each record in a Kinesis event gets processed individually"""
multi_record_event = {
"Records": [
{
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
"kinesis": {
"data": base64.b64encode(
json.dumps({"dd-pathway-ctx-base64": "context1"}).encode(
"utf-8"
)
).decode("utf-8"),
"partitionKey": "partition-1",
},
},
{
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
"kinesis": {
"data": base64.b64encode(
json.dumps({"dd-pathway-ctx-base64": "context2"}).encode(
"utf-8"
)
).decode("utf-8"),
"partitionKey": "partition-2",
},
},
{
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
"kinesis": {
"data": base64.b64encode(
json.dumps({"dd-pathway-ctx-base64": "context3"}).encode(
"utf-8"
)
).decode("utf-8"),
"partitionKey": "partition-3",
},
},
]
}

self.mock_get_dsm_context_from_lambda.side_effect = [
{"dd-pathway-ctx-base64": "context1"},
{"dd-pathway-ctx-base64": "context2"},
{"dd-pathway-ctx-base64": "context3"},
]

_dsm_set_kinesis_context(multi_record_event)

self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3)

calls = self.mock_set_consume_checkpoint.call_args_list
expected_arns = [
"arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
"arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
"arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
]
expected_contexts = ["context1", "context2", "context3"]

for i, call in enumerate(calls):
args, kwargs = call
tags = args[0]
self.assertIn("direction:in", tags)
self.assertIn(f"topic:{expected_arns[i]}", tags)
self.assertIn("type:sqs", tags)
self.assertEqual(kwargs["payload_size"], 100)
service_type = args[0]
arn = args[1]
carrier_get_func = args[2]

self.assertEqual(service_type, "kinesis")

self.assertEqual(arn, expected_arns[i])

pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
self.assertEqual(pathway_ctx, expected_contexts[i])


class TestGetDSMContext(unittest.TestCase):
def test_sqs_to_lambda_string_value_format(self):
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "789123456",
"x-datadog-parent-id": "321987654",
"dd-pathway-ctx": "test-pathway-ctx",
}

lambda_record = {
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
},
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(trace_context),
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
"myAttribute": {
"stringValue": "myValue",
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "789123456"
assert result["x-datadog-parent-id"] == "321987654"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_lambda_format(self):
"""Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "111111111",
"x-datadog-parent-id": "222222222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8")

sns_lambda_record = {
"EventSource": "aws:sns",
"EventSubscriptionArn": (
"arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012"
),
"Sns": {
"Type": "Notification",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic",
"Subject": "Test Subject",
"Message": "Hello from SNS!",
"Timestamp": "2023-01-01T12:00:00.000Z",
"MessageAttributes": {
"_datadog": {"Type": "Binary", "Value": binary_data}
},
},
}

result = _get_dsm_context_from_lambda(sns_lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "111111111"
assert result["x-datadog-parent-id"] == "222222222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_kinesis_to_lambda_format(self):
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
trace_context = {
"x-datadog-trace-id": "555444333",
"x-datadog-parent-id": "888777666",
"dd-pathway-ctx": "test-pathway-ctx",
}

# Create the kinesis data payload
kinesis_payload = {
"_datadog": trace_context,
"actualData": "some business data",
}
encoded_kinesis_data = base64.b64encode(
json.dumps(kinesis_payload).encode("utf-8")
).decode("utf-8")

kinesis_lambda_record = {
"eventSource": "aws:kinesis",
"eventSourceARN": (
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
),
"kinesis": {
"data": encoded_kinesis_data,
"partitionKey": "partition-key-1",
"sequenceNumber": (
"49590338271490256608559692538361571095921575989136588898"
),
},
}

result = _get_dsm_context_from_lambda(kinesis_lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "555444333"
assert result["x-datadog-parent-id"] == "888777666"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_no_message_attributes(self):
"""Test message without MessageAttributes returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message without attributes",
}

result = _get_dsm_context_from_lambda(message)

assert result is None

def test_no_datadog_attribute(self):
"""Test message with MessageAttributes but no _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message",
"messageAttributes": {
"customAttribute": {"stringValue": "custom-value", "dataType": "String"}
},
}

result = _get_dsm_context_from_lambda(message)

assert result is None

def test_empty_datadog_attribute(self):
"""Test message with empty _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"messageAttributes": {"_datadog": {}},
}

result = _get_dsm_context_from_lambda(message)

assert result is None