Skip to content

Commit 33f07b9

Browse files
author
Ran Isenberg
committed
feat: Add Kinesis lambda event support to Parser utility
1 parent fa34d82 commit 33f07b9

File tree

6 files changed

+204
-1
lines changed

6 files changed

+204
-1
lines changed
Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
from .base import BaseEnvelope
22
from .dynamodb import DynamoDBStreamEnvelope
33
from .event_bridge import EventBridgeEnvelope
4+
from .kinesis import KinesisEnvelope
45
from .sns import SnsEnvelope
56
from .sqs import SqsEnvelope
67

7-
__all__ = ["DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", "SqsEnvelope", "BaseEnvelope"]
8+
__all__ = [
9+
"DynamoDBStreamEnvelope",
10+
"EventBridgeEnvelope",
11+
"KinesisEnvelope",
12+
"SnsEnvelope",
13+
"SqsEnvelope",
14+
"BaseEnvelope",
15+
]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Union
3+
4+
from ..models import KinesisStreamModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KinesisEnvelope(BaseEnvelope):
12+
"""Kinesis Envelope to extract array of Records
13+
14+
The record's data parameter is a base64 encoded string which is parsed into a bytes array,
15+
though it can also be a JSON encoded string.
16+
Regardless of its type it'll be parsed into a BaseModel object.
17+
18+
Note: Records will be parsed the same way so if model is str,
19+
all items in the list will be parsed as str and npt as JSON (and vice versa)
20+
"""
21+
22+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
23+
"""Parses records found with model provided
24+
25+
Parameters
26+
----------
27+
data : Dict
28+
Lambda event to be parsed
29+
model : Model
30+
Data model provided to parse after extracting data using envelope
31+
32+
Returns
33+
-------
34+
List
35+
List of records parsed with model provided
36+
"""
37+
logger.debug(f"Parsing incoming data with Kinesis model {KinesisStreamModel}")
38+
parsed_envelope: KinesisStreamModel = KinesisStreamModel.parse_obj(data)
39+
output = []
40+
logger.debug(f"Parsing Kinesis records in `body` with {model}")
41+
for record in parsed_envelope.Records:
42+
output.append(self._parse(data=record.kinesis.data.decode("utf-8"), model=model))
43+
return output

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
22
from .event_bridge import EventBridgeModel
3+
from .kinesis import KinesisStreamModel, KinesisStreamRecord, KinesisStreamRecordPayload
34
from .ses import SesModel, SesRecordModel
45
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
56
from .sqs import SqsModel, SqsRecordModel
@@ -9,6 +10,9 @@
910
"EventBridgeModel",
1011
"DynamoDBStreamChangedRecordModel",
1112
"DynamoDBStreamRecordModel",
13+
"KinesisStreamModel",
14+
"KinesisStreamRecord",
15+
"KinesisStreamRecordPayload",
1216
"SesModel",
1317
"SesRecordModel",
1418
"SnsModel",
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import base64
2+
from binascii import Error as BinAsciiError
3+
from typing import List
4+
5+
from pydantic import BaseModel, validator
6+
from pydantic.types import PositiveInt
7+
from typing_extensions import Literal
8+
9+
10+
class KinesisStreamRecordPayload(BaseModel):
11+
kinesisSchemaVersion: str
12+
partitionKey: str
13+
sequenceNumber: PositiveInt
14+
data: bytes # base64 encoded str is parsed into bytes
15+
approximateArrivalTimestamp: float
16+
17+
@validator("data", pre=True)
18+
def data_base64_decode(cls, value):
19+
try:
20+
return base64.b64decode(value)
21+
except (BinAsciiError, TypeError):
22+
raise ValueError("base64 decode failed")
23+
24+
25+
class KinesisStreamRecord(BaseModel):
26+
eventSource: Literal["aws:kinesis"]
27+
eventVersion: str
28+
eventID: str
29+
eventName: Literal["aws:kinesis:record"]
30+
invokeIdentityArn: str
31+
awsRegion: str
32+
eventSourceARN: str
33+
kinesis: KinesisStreamRecordPayload
34+
35+
36+
class KinesisStreamModel(BaseModel):
37+
Records: List[KinesisStreamRecord]

tests/functional/parser/schemas.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,8 @@ class MyAdvancedSnsRecordModel(SnsRecordModel):
7171

7272
class MyAdvancedSnsBusiness(SnsModel):
7373
Records: List[MyAdvancedSnsRecordModel]
74+
75+
76+
class MyKinesisBusiness(BaseModel):
77+
message: str
78+
username: str
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from typing import Any, List
2+
3+
import pytest
4+
5+
from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
6+
from aws_lambda_powertools.utilities.parser.models import KinesisStreamModel, KinesisStreamRecordPayload
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
from tests.functional.parser.schemas import MyKinesisBusiness
9+
from tests.functional.parser.utils import load_event
10+
11+
12+
@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisEnvelope)
13+
def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext):
14+
assert len(event) == 1
15+
record: KinesisStreamModel = event[0]
16+
assert record.message == "test message"
17+
assert record.username == "test"
18+
19+
20+
@event_parser(model=KinesisStreamModel)
21+
def handle_kinesis_no_envelope(event: KinesisStreamModel, _: LambdaContext):
22+
records = event.Records
23+
assert len(records) == 2
24+
record: KinesisStreamModel = records[0]
25+
26+
assert record.awsRegion == "us-east-2"
27+
assert record.eventID == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"
28+
assert record.eventName == "aws:kinesis:record"
29+
assert record.eventSource == "aws:kinesis"
30+
assert record.eventSourceARN == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
31+
assert record.eventVersion == "1.0"
32+
assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role"
33+
34+
kinesis: KinesisStreamRecordPayload = record.kinesis
35+
assert kinesis.approximateArrivalTimestamp == 1545084650.987
36+
assert kinesis.kinesisSchemaVersion == "1.0"
37+
assert kinesis.partitionKey == "1"
38+
assert kinesis.sequenceNumber == 49590338271490256608559692538361571095921575989136588898
39+
assert kinesis.data == b"Hello, this is a test."
40+
41+
42+
def test_kinesis_trigger_event():
43+
event_dict = {
44+
"Records": [
45+
{
46+
"kinesis": {
47+
"kinesisSchemaVersion": "1.0",
48+
"partitionKey": "1",
49+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
50+
"data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=",
51+
"approximateArrivalTimestamp": 1545084650.987,
52+
},
53+
"eventSource": "aws:kinesis",
54+
"eventVersion": "1.0",
55+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
56+
"eventName": "aws:kinesis:record",
57+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
58+
"awsRegion": "us-east-2",
59+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
60+
}
61+
]
62+
}
63+
64+
handle_kinesis(event_dict, LambdaContext())
65+
66+
67+
def test_kinesis_trigger_bad_base64_event():
68+
event_dict = {
69+
"Records": [
70+
{
71+
"kinesis": {
72+
"kinesisSchemaVersion": "1.0",
73+
"partitionKey": "1",
74+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
75+
"data": "bad",
76+
"approximateArrivalTimestamp": 1545084650.987,
77+
},
78+
"eventSource": "aws:kinesis",
79+
"eventVersion": "1.0",
80+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
81+
"eventName": "aws:kinesis:record",
82+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
83+
"awsRegion": "us-east-2",
84+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
85+
}
86+
]
87+
}
88+
with pytest.raises(ValidationError):
89+
handle_kinesis_no_envelope(event_dict, LambdaContext())
90+
91+
92+
def test_kinesis_trigger_event_no_envelope():
93+
event_dict = load_event("kinesisStreamEvent.json")
94+
handle_kinesis_no_envelope(event_dict, LambdaContext())
95+
96+
97+
def test_validate_event_does_not_conform_with_model_no_envelope():
98+
event_dict: Any = {"hello": "s"}
99+
with pytest.raises(ValidationError):
100+
handle_kinesis_no_envelope(event_dict, LambdaContext())
101+
102+
103+
def test_validate_event_does_not_conform_with_model():
104+
event_dict: Any = {"hello": "s"}
105+
with pytest.raises(ValidationError):
106+
handle_kinesis(event_dict, LambdaContext())

0 commit comments

Comments
 (0)