Skip to content

Commit 4716aa3

Browse files
ran-isenbergRan Isenberg
authored andcommitted
feat: Add Kinesis lambda event support to Parser utility
1 parent 33f07b9 commit 4716aa3

File tree

5 files changed

+15
-11
lines changed

5 files changed

+15
-11
lines changed

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from .base import BaseEnvelope
22
from .dynamodb import DynamoDBStreamEnvelope
33
from .event_bridge import EventBridgeEnvelope
4-
from .kinesis import KinesisEnvelope
4+
from .kinesis import KinesisDataStreamEnvelope
55
from .sns import SnsEnvelope
66
from .sqs import SqsEnvelope
77

88
__all__ = [
99
"DynamoDBStreamEnvelope",
1010
"EventBridgeEnvelope",
11-
"KinesisEnvelope",
11+
"KinesisDataStreamEnvelope",
1212
"SnsEnvelope",
1313
"SqsEnvelope",
1414
"BaseEnvelope",

aws_lambda_powertools/utilities/parser/envelopes/kinesis.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
logger = logging.getLogger(__name__)
99

1010

11-
class KinesisEnvelope(BaseEnvelope):
12-
"""Kinesis Envelope to extract array of Records
11+
class KinesisDataStreamEnvelope(BaseEnvelope):
12+
"""Kinesis Data Stream Envelope to extract array of Records
1313
1414
The record's data parameter is a base64 encoded string which is parsed into a bytes array,
1515
though it can also be a JSON encoded string.

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
22
from .event_bridge import EventBridgeModel
3-
from .kinesis import KinesisStreamModel, KinesisStreamRecord, KinesisStreamRecordPayload
3+
from .kinesis import KinesisDataStreamRecordPayload, KinesisStreamModel, KinesisStreamRecord
44
from .ses import SesModel, SesRecordModel
55
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
66
from .sqs import SqsModel, SqsRecordModel
@@ -12,7 +12,7 @@
1212
"DynamoDBStreamRecordModel",
1313
"KinesisStreamModel",
1414
"KinesisStreamRecord",
15-
"KinesisStreamRecordPayload",
15+
"KinesisDataStreamRecordPayload",
1616
"SesModel",
1717
"SesRecordModel",
1818
"SnsModel",

aws_lambda_powertools/utilities/parser/models/kinesis.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import base64
2+
import logging
23
from binascii import Error as BinAsciiError
34
from typing import List
45

56
from pydantic import BaseModel, validator
67
from pydantic.types import PositiveInt
78
from typing_extensions import Literal
89

10+
logger = logging.getLogger(__name__)
911

10-
class KinesisStreamRecordPayload(BaseModel):
12+
13+
class KinesisDataStreamRecordPayload(BaseModel):
1114
kinesisSchemaVersion: str
1215
partitionKey: str
1316
sequenceNumber: PositiveInt
@@ -17,6 +20,7 @@ class KinesisStreamRecordPayload(BaseModel):
1720
@validator("data", pre=True)
1821
def data_base64_decode(cls, value):
1922
try:
23+
logger.debug("Decoding base64 Kinesis data record before parsing")
2024
return base64.b64decode(value)
2125
except (BinAsciiError, TypeError):
2226
raise ValueError("base64 decode failed")
@@ -30,7 +34,7 @@ class KinesisStreamRecord(BaseModel):
3034
invokeIdentityArn: str
3135
awsRegion: str
3236
eventSourceARN: str
33-
kinesis: KinesisStreamRecordPayload
37+
kinesis: KinesisDataStreamRecordPayload
3438

3539

3640
class KinesisStreamModel(BaseModel):

tests/functional/parser/test_kinesis.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import pytest
44

55
from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
6-
from aws_lambda_powertools.utilities.parser.models import KinesisStreamModel, KinesisStreamRecordPayload
6+
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecordPayload, KinesisStreamModel
77
from aws_lambda_powertools.utilities.typing import LambdaContext
88
from tests.functional.parser.schemas import MyKinesisBusiness
99
from tests.functional.parser.utils import load_event
1010

1111

12-
@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisEnvelope)
12+
@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope)
1313
def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext):
1414
assert len(event) == 1
1515
record: KinesisStreamModel = event[0]
@@ -31,7 +31,7 @@ def handle_kinesis_no_envelope(event: KinesisStreamModel, _: LambdaContext):
3131
assert record.eventVersion == "1.0"
3232
assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role"
3333

34-
kinesis: KinesisStreamRecordPayload = record.kinesis
34+
kinesis: KinesisDataStreamRecordPayload = record.kinesis
3535
assert kinesis.approximateArrivalTimestamp == 1545084650.987
3636
assert kinesis.kinesisSchemaVersion == "1.0"
3737
assert kinesis.partitionKey == "1"

0 commit comments

Comments
 (0)