diff --git a/Makefile b/Makefile index 39c92a121a5..6750db18446 100644 --- a/Makefile +++ b/Makefile @@ -7,18 +7,18 @@ target: dev: pip install --upgrade pip pre-commit poetry @$(MAKE) dev-version-plugin - poetry install --extras "all redis datamasking" + poetry install --extras "all redis datamasking valkey" pre-commit install dev-quality-code: pip install --upgrade pip pre-commit poetry @$(MAKE) dev-version-plugin - poetry install --extras "all redis datamasking" + poetry install --extras "all redis datamasking valkey" pre-commit install dev-gitpod: pip install --upgrade pip poetry - poetry install --extras "all redis datamasking" + poetry install --extras "all redis datamasking valkey" pre-commit install format-check: diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/cache.py b/aws_lambda_powertools/utilities/idempotency/persistence/cache.py new file mode 100644 index 00000000000..fcd2c37c7c1 --- /dev/null +++ b/aws_lambda_powertools/utilities/idempotency/persistence/cache.py @@ -0,0 +1,11 @@ +from aws_lambda_powertools.utilities.idempotency.persistence.redis import ( + CacheClientProtocol, + CacheConnection, + CachePersistenceLayer, +) + +__all__ = [ + "CacheClientProtocol", + "CachePersistenceLayer", + "CacheConnection", +] diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/redis.py b/aws_lambda_powertools/utilities/idempotency/persistence/redis.py index 7f27566cc24..d1c490ee0f3 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/redis.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/redis.py @@ -8,6 +8,7 @@ from typing import Any, Literal, Protocol import redis +from typing_extensions import TypeAlias, deprecated from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer from aws_lambda_powertools.utilities.idempotency.exceptions import ( @@ -25,11 +26,12 @@ logger = logging.getLogger(__name__) +@deprecated("RedisPersistenceLayer will be removed in v4.0.0. Please use CacheProtocol instead.") class RedisClientProtocol(Protocol): """ - Protocol class defining the interface for a Redis client. + Protocol class defining the interface for a Cache client. - This protocol outlines the expected behavior of a Redis client, allowing for + This protocol outlines the expected behavior of a Cache client, allowing for standardization among different implementations and allowing customers to extend it in their own implementation. @@ -78,6 +80,7 @@ def delete(self, keys: bytes | str | memoryview) -> Any: raise NotImplementedError +@deprecated("RedisConnection will be removed in v4.0.0. Please use CacheConnection instead.") class RedisConnection: def __init__( self, @@ -85,32 +88,32 @@ def __init__( host: str = "", port: int = 6379, username: str = "", - password: str = "", # nosec - password for Redis connection + password: str = "", # nosec - password for Cache connection db_index: int = 0, mode: Literal["standalone", "cluster"] = "standalone", ssl: bool = True, ) -> None: """ - Initialize Redis connection which will be used in Redis persistence_store to support Idempotency + Initialize Cache connection which will be used in Cache persistence_store to support Idempotency Parameters ---------- host: str, optional - Redis host + Cache host port: int, optional: default 6379 - Redis port + Cache port username: str, optional - Redis username + Cache username password: str, optional - Redis password + Cache password url: str, optional - Redis connection string, using url will override the host/port in the previous parameters + Cache connection string, using url will override the host/port in the previous parameters db_index: int, optional: default 0 - Redis db index + Cache db index mode: str, Literal["standalone","cluster"] - set Redis client mode, choose from standalone/cluster. The default is standalone + set Cache client mode, choose from standalone/cluster. The default is standalone ssl: bool, optional: default True - set whether to use ssl for Redis connection + set whether to use ssl for Cache connection Example -------- @@ -122,13 +125,13 @@ def __init__( from aws_lambda_powertools.utilities.idempotency import ( idempotent, ) - from aws_lambda_powertools.utilities.idempotency.persistence.redis import ( - RedisCachePersistenceLayer, + from aws_lambda_powertools.utilities.idempotency.persistence.cache import ( + CachePersistenceLayer, ) from aws_lambda_powertools.utilities.typing import LambdaContext - persistence_layer = RedisCachePersistenceLayer(host="localhost", port=6379) + persistence_layer = CachePersistenceLayer(host="localhost", port=6379) @dataclass @@ -181,15 +184,15 @@ def _init_client(self) -> RedisClientProtocol: try: if self.url: - logger.debug(f"Using URL format to connect to Redis: {self.host}") + logger.debug(f"Using URL format to connect to Cache: {self.host}") return client.from_url(url=self.url) else: - # Redis in cluster mode doesn't support db parameter + # Cache in cluster mode doesn't support db parameter extra_param_connection: dict[str, Any] = {} if self.mode != "cluster": extra_param_connection = {"db": self.db_index} - logger.debug(f"Using arguments to connect to Redis: {self.host}") + logger.debug(f"Using arguments to connect to Cache: {self.host}") return client( host=self.host, port=self.port, @@ -200,10 +203,11 @@ def _init_client(self) -> RedisClientProtocol: **extra_param_connection, ) except redis.exceptions.ConnectionError as exc: - logger.debug(f"Cannot connect in Redis: {self.host}") - raise IdempotencyPersistenceConnectionError("Could not to connect to Redis", exc) from exc + logger.debug(f"Cannot connect to Cache endpoint: {self.host}") + raise IdempotencyPersistenceConnectionError("Could not to connect to Cache endpoint", exc) from exc +@deprecated("RedisCachePersistenceLayer will be removed in v4.0.0. Please use CachePersistenceLayer instead.") class RedisCachePersistenceLayer(BasePersistenceLayer): def __init__( self, @@ -211,7 +215,7 @@ def __init__( host: str = "", port: int = 6379, username: str = "", - password: str = "", # nosec - password for Redis connection + password: str = "", # nosec - password for Cache connection db_index: int = 0, mode: Literal["standalone", "cluster"] = "standalone", ssl: bool = True, @@ -223,39 +227,39 @@ def __init__( validation_key_attr: str = "validation", ): """ - Initialize the Redis Persistence Layer + Initialize the Cache Persistence Layer Parameters ---------- host: str, optional - Redis host + Cache host port: int, optional: default 6379 - Redis port + Cache port username: str, optional - Redis username + Cache username password: str, optional - Redis password + Cache password url: str, optional - Redis connection string, using url will override the host/port in the previous parameters + Cache connection string, using url will override the host/port in the previous parameters db_index: int, optional: default 0 - Redis db index + Cache db index mode: str, Literal["standalone","cluster"] - set Redis client mode, choose from standalone/cluster + set Cache client mode, choose from standalone/cluster ssl: bool, optional: default True - set whether to use ssl for Redis connection - client: RedisClientProtocol, optional - Bring your own Redis client that follows RedisClientProtocol. + set whether to use ssl for Cache connection + client: CacheClientProtocol, optional + Bring your own Cache client that follows CacheClientProtocol. If provided, all other connection configuration options will be ignored expiry_attr: str, optional - Redis json attribute name for expiry timestamp, by default "expiration" + Cache json attribute name for expiry timestamp, by default "expiration" in_progress_expiry_attr: str, optional - Redis json attribute name for in-progress expiry timestamp, by default "in_progress_expiration" + Cache json attribute name for in-progress expiry timestamp, by default "in_progress_expiration" status_attr: str, optional - Redis json attribute name for status, by default "status" + Cache json attribute name for status, by default "status" data_attr: str, optional - Redis json attribute name for response data, by default "data" + Cache json attribute name for response data, by default "data" validation_key_attr: str, optional - Redis json attribute name for hashed representation of the parts of the event used for validation + Cache json attribute name for hashed representation of the parts of the event used for validation Examples -------- @@ -266,8 +270,8 @@ def __init__( idempotent, ) - from aws_lambda_powertools.utilities.idempotency.persistence.redis import ( - RedisCachePersistenceLayer, + from aws_lambda_powertools.utilities.idempotency.persistence.cache import ( + CachePersistenceLayer, ) client = redis.Redis( @@ -275,7 +279,7 @@ def __init__( port="6379", decode_responses=True, ) - persistence_layer = RedisCachePersistenceLayer(client=client) + persistence_layer = CachePersistenceLayer(client=client) @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): @@ -288,7 +292,7 @@ def lambda_handler(event: dict, context: LambdaContext): ``` """ - # Initialize Redis client with Redis config if no client is passed in + # Initialize Cache client with cache config if no client is passed in if client is None: self.client = RedisConnection( host=host, @@ -330,11 +334,11 @@ def _item_to_data_record(self, idempotency_key: str, item: dict[str, Any]) -> Da in_progress_expiry_timestamp=in_progress_expiry_timestamp, response_data=str(item.get(self.data_attr)), payload_hash=str(item.get(self.validation_key_attr)), - expiry_timestamp=item.get("expiration", None), + expiry_timestamp=item.get("expiration"), ) def _get_record(self, idempotency_key) -> DataRecord: - # See: https://redis.io/commands/get/ + # See: https://valkey.io/valkey-glide/python/core/#glide.async_commands.CoreCommands.set response = self.client.get(idempotency_key) # key not found @@ -384,25 +388,25 @@ def _put_in_progress_record(self, data_record: DataRecord) -> None: # The idempotency key does not exist: # - first time that this invocation key is used # - previous invocation with the same key was deleted due to TTL - # - SET see https://redis.io/commands/set/ + # - SET see https://valkey.io/valkey-glide/python/core/#glide.async_commands.CoreCommands.set - logger.debug(f"Putting record on Redis for idempotency key: {data_record.idempotency_key}") + logger.debug(f"Putting record on Cache for idempotency key: {data_record.idempotency_key}") encoded_item = self._json_serializer(item["mapping"]) ttl = self._get_expiry_second(expiry_timestamp=data_record.expiry_timestamp) - redis_response = self.client.set(name=data_record.idempotency_key, value=encoded_item, ex=ttl, nx=True) + cache_response = self.client.set(name=data_record.idempotency_key, value=encoded_item, ex=ttl, nx=True) - # If redis_response is True, the Redis SET operation was successful and the idempotency key was not + # If cache_response is True, the Cache SET operation was successful and the idempotency key was not # previously set. This indicates that we can safely proceed to the handler execution phase. # Most invocations should successfully proceed past this point. - if redis_response: + if cache_response: return - # If redis_response is None, it indicates an existing record in Redis for the given idempotency key. + # If cache_response is None, it indicates an existing record in Cache for the given idempotency key. # This could be due to: # - An active idempotency record from a previous invocation that has not yet expired. # - An orphan record where a previous invocation has timed out. - # - An expired idempotency record that has not been deleted by Redis. + # - An expired idempotency record that has not been deleted by Cache. # In any case, we proceed to retrieve the record for further inspection. idempotency_record = self._get_record(data_record.idempotency_key) @@ -427,7 +431,7 @@ def _put_in_progress_record(self, data_record: DataRecord) -> None: # Reaching this point indicates that the idempotency record found is an orphan record. An orphan record is # one that is neither completed nor in-progress within its expected time frame. It may result from a - # previous invocation that has timed out or an expired record that has yet to be cleaned up by Redis. + # previous invocation that has timed out or an expired record that has yet to be cleaned up by Cache. # We raise an error to handle this exceptional scenario appropriately. raise IdempotencyPersistenceConsistencyError @@ -435,24 +439,22 @@ def _put_in_progress_record(self, data_record: DataRecord) -> None: # Handle an orphan record by attempting to acquire a lock, which by default lasts for 10 seconds. # The purpose of acquiring the lock is to prevent race conditions with other processes that might # also be trying to handle the same orphan record. Once the lock is acquired, we set a new value - # for the idempotency record in Redis with the appropriate time-to-live (TTL). + # for the idempotency record in Cache with the appropriate time-to-live (TTL). with self._acquire_lock(name=item["name"]): self.client.set(name=item["name"], value=encoded_item, ex=ttl) # Not removing the lock here serves as a safeguard against race conditions, # preventing another operation from mistakenly treating this record as an orphan while the # current operation is still in progress. - except (redis.exceptions.RedisError, redis.exceptions.RedisClusterException) as e: - raise e except Exception as e: - logger.debug(f"encountered non-Redis exception: {e}") - raise e + logger.debug(f"An error occurred: {e}") + raise @contextmanager def _acquire_lock(self, name: str): """ Attempt to acquire a lock for a specified resource name, with a default timeout. - This context manager attempts to set a lock using Redis to prevent concurrent + This context manager attempts to set a lock using Cache to prevent concurrent access to a resource identified by 'name'. It uses the 'nx' flag to ensure that the lock is only set if it does not already exist, thereby enforcing mutual exclusion. """ @@ -496,9 +498,9 @@ def _update_record(self, data_record: DataRecord) -> None: def _delete_record(self, data_record: DataRecord) -> None: """ - Deletes the idempotency record associated with a given DataRecord from Redis. + Deletes the idempotency record associated with a given DataRecord from Cache. This function is designed to be called after a Lambda handler invocation has completed processing. - It ensures that the idempotency key associated with the DataRecord is removed from Redis to + It ensures that the idempotency key associated with the DataRecord is removed from Cache to prevent future conflicts and to maintain the idempotency integrity. Note: it is essential that the idempotency key is not empty, as that would indicate the Lambda @@ -506,5 +508,10 @@ def _delete_record(self, data_record: DataRecord) -> None: """ logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") - # See: https://redis.io/commands/del/ + # See: https://valkey.io/valkey-glide/python/core/#glide.async_commands.CoreCommands.delete self.client.delete(data_record.idempotency_key) + + +CachePersistenceLayer: TypeAlias = RedisCachePersistenceLayer +CacheClientProtocol: TypeAlias = RedisClientProtocol +CacheConnection: TypeAlias = RedisConnection diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 97ffd38903b..7786813b9e4 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -12,7 +12,7 @@ The idempotency utility allows you to retry operations within a time window with * Produces the previous successful result when a function is called repeatedly with the same idempotency key * Choose your idempotency key from one or more fields, or entire payload * Safeguard concurrent requests, timeouts, missing idempotency keys, and payload tampering -* Support for Amazon DynamoDB, Redis, bring your own persistence layer, and in-memory caching +* Support for Amazon DynamoDB, Valkey, Redis OSS, or any Redis-compatible cache as the persistence layer ## Terminology @@ -82,7 +82,7 @@ To start, you'll need: --- - [Amazon DynamoDB](#dynamodb-table) or [Redis](#redis-database) + [Amazon DynamoDB](#dynamodb-table) or [Valkey/Redis OSS/Redis compatible](#cache-database) * :simple-awslambda:{ .lg .middle } **AWS Lambda function** @@ -139,13 +139,13 @@ You **can** use a single DynamoDB table for all functions annotated with Idempot * **Old boto3 versions can increase costs**. For cost optimization, we use a conditional `PutItem` to always lock a new idempotency record. If locking fails, it means we already have an idempotency record saving us an additional `GetItem` call. However, this is only supported in boto3 `1.26.194` and higher _([June 30th 2023](https://aws.amazon.com/about-aws/whats-new/2023/06/amazon-dynamodb-cost-failed-conditional-writes/){target="_blank"})_. -#### Redis database +#### Cache database -We recommend you start with a Redis compatible management services such as [Amazon ElastiCache for Redis](https://aws.amazon.com/elasticache/redis/){target="_blank"} or [Amazon MemoryDB for Redis](https://aws.amazon.com/memorydb/){target="_blank"}. +We recommend starting with a managed cache service, such as [Amazon ElastiCache for Valkey and for Redis OSS](https://aws.amazon.com/elasticache/redis/){target="_blank"} or [Amazon MemoryDB](https://aws.amazon.com/memorydb/){target="_blank"}. In both services, you'll need to configure [VPC access](https://docs.aws.amazon.com/lambda/latest/dg/configuration-vpc.html){target="_blank"} to your AWS Lambda. -##### Redis IaC examples +##### Cache configuration === "AWS CloudFormation example" @@ -160,7 +160,7 @@ In both services, you'll need to configure [VPC access](https://docs.aws.amazon. 1. Replace the Security Group ID and Subnet ID to match your VPC settings. 2. Replace the Security Group ID and Subnet ID to match your VPC settings. -Once setup, you can find a quick start and advanced examples for Redis in [the persistent layers section](#redispersistencelayer). +Once setup, you can find a quick start and advanced examples for Cache in [the persistent layers section](#cachepersistencelayer). @@ -464,17 +464,22 @@ You can customize the attribute names during initialization: | **sort_key_attr** | | | Sort key of the table (if table is configured with a sort key). | | **static_pk_value** | | `idempotency#{LAMBDA_FUNCTION_NAME}` | Static value to use as the partition key. Only used when **sort_key_attr** is set. | -#### RedisPersistenceLayer +#### CachePersistenceLayer -!!! info "We recommend Redis version 7 or higher for optimal performance." +The `CachePersistenceLayer` enables you to use Valkey, Redis OSS, or any Redis-compatible cache as the persistence layer for idempotency state. -For simple setups, initialize `RedisCachePersistenceLayer` with your Redis endpoint and port to connect. +We recommend using [`valkey-glide`](https://pypi.org/project/valkey-glide/){target="_blank"} for Valkey or [`redis`](https://pypi.org/project/redis/){target="_blank"} for Redis. However, any Redis OSS-compatible client should work. -For security, we enforce SSL connections by default; to disable it, set `ssl=False`. +For simple setups, initialize `CachePersistenceLayer` with your Cache endpoint and port to connect. Note that for security, we enforce SSL connections by default; to disable it, set `ssl=False`. -=== "Redis quick start" - ```python title="getting_started_with_idempotency_redis_config.py" hl_lines="8-10 14 27" - --8<-- "examples/idempotency/src/getting_started_with_idempotency_redis_config.py" +=== "Cache quick start" + ```python title="getting_started_with_idempotency_cache_config.py" hl_lines="8-10 14 27" + --8<-- "examples/idempotency/src/getting_started_with_idempotency_cache_config.py" + ``` + +=== "Using an existing Valkey Glide client" + ```python title="getting_started_with_idempotency_valkey_client.py" hl_lines="5 10-12 16-22 24 37" + --8<-- "examples/idempotency/src/getting_started_with_idempotency_valkey_client.py" ``` === "Using an existing Redis client" @@ -488,23 +493,23 @@ For security, we enforce SSL connections by default; to disable it, set `ssl=Fal --8<-- "examples/idempotency/src/getting_started_with_idempotency_payload.json" ``` -##### Redis SSL connections +##### Cache SSL connections We recommend using AWS Secrets Manager to store and rotate certificates safely, and the [Parameters feature](./parameters.md){target="_blank"} to fetch and cache optimally. -For advanced configurations, we recommend using an existing Redis client for optimal compatibility like SSL certificates and timeout. +For advanced configurations, we recommend using an existing Valkey client for optimal compatibility like SSL certificates and timeout. === "Advanced configuration using AWS Secrets" - ```python title="using_redis_client_with_aws_secrets.py" hl_lines="9-11 13 15 25" - --8<-- "examples/idempotency/src/using_redis_client_with_aws_secrets.py" + ```python title="using_cache_client_with_aws_secrets.py" hl_lines="5 9-11 13 15 18 19 23" + --8<-- "examples/idempotency/src/using_cache_client_with_aws_secrets.py" ``` 1. JSON stored: ```json { - "REDIS_ENDPOINT": "127.0.0.1", - "REDIS_PORT": "6379", - "REDIS_PASSWORD": "redis-secret" + "CACHE_HOST": "127.0.0.1", + "CACHE_PORT": "6379", + "CACHE_PASSWORD": "cache-secret" } ``` @@ -516,16 +521,16 @@ For advanced configurations, we recommend using an existing Redis client for opt 1. JSON stored: ```json { - "REDIS_ENDPOINT": "127.0.0.1", - "REDIS_PORT": "6379", - "REDIS_PASSWORD": "redis-secret" + "CACHE_HOST": "127.0.0.1", + "CACHE_PORT": "6379", + "CACHE_PASSWORD": "cache-secret" } ``` - 2. redis_user.crt file stored in the "certs" directory of your Lambda function - 3. redis_user_private.key file stored in the "certs" directory of your Lambda function - 4. redis_ca.pem file stored in the "certs" directory of your Lambda function + 2. cache_user.crt file stored in the "certs" directory of your Lambda function + 3. cache_user_private.key file stored in the "certs" directory of your Lambda function + 4. cache_ca.pem file stored in the "certs" directory of your Lambda function -##### Redis attributes +##### Cache attributes You can customize the attribute names during initialization: @@ -811,28 +816,28 @@ sequenceDiagram Optional idempotency key -#### Race condition with Redis +#### Race condition with Cache