Skip to content

Commit 44a64f7

Browse files
committed
Fix CI
Signed-off-by: Zike Yang <[email protected]>
1 parent d0fe26f commit 44a64f7

File tree

16 files changed

+180
-137
lines changed

16 files changed

+180
-137
lines changed

sdks/fs-python/README.md

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
# FunctionStream Python SDK
1818

19-
FunctionStream SDK is a powerful Python library for building and deploying serverless functions that process messages from Apache Pulsar. It provides a simple yet flexible framework for creating event-driven applications with robust error handling, metrics collection, and resource management.
19+
FunctionStream SDK is a powerful Python library for building and deploying serverless functions that process messages
20+
from Apache Pulsar. It provides a simple yet flexible framework for creating event-driven applications with robust error
21+
handling, metrics collection, and resource management.
2022

2123
## Features
2224

@@ -103,6 +105,7 @@ modules:
103105
### FSFunction
104106

105107
The main class for creating serverless functions. It handles:
108+
106109
- Message consumption and processing
107110
- Response generation
108111
- Resource management
@@ -112,6 +115,7 @@ The main class for creating serverless functions. It handles:
112115
### Configuration
113116

114117
The SDK uses YAML configuration files to define:
118+
115119
- Pulsar connection settings
116120
- Module selection
117121
- Topic subscriptions
@@ -121,6 +125,7 @@ The SDK uses YAML configuration files to define:
121125
### Metrics
122126

123127
Built-in metrics collection for:
128+
124129
- Request processing time
125130
- Success/failure rates
126131
- Message throughput
@@ -138,24 +143,24 @@ Check out the `examples` directory for complete examples:
138143
## Best Practices
139144

140145
1. **Error Handling**
141-
- Always handle exceptions in your process functions
142-
- Use proper logging for debugging
143-
- Implement graceful shutdown
146+
- Always handle exceptions in your process functions
147+
- Use proper logging for debugging
148+
- Implement graceful shutdown
144149

145150
2. **Resource Management**
146-
- Close resources properly
147-
- Use context managers when possible
148-
- Monitor resource usage
151+
- Close resources properly
152+
- Use context managers when possible
153+
- Monitor resource usage
149154

150155
3. **Configuration**
151-
- Use environment variables for sensitive data
152-
- Validate configuration values
153-
- Document configuration options
156+
- Use environment variables for sensitive data
157+
- Validate configuration values
158+
- Document configuration options
154159

155160
4. **Testing**
156-
- Write unit tests for your functions
157-
- Test error scenarios
158-
- Validate input/output schemas
161+
- Write unit tests for your functions
162+
- Test error scenarios
163+
- Validate input/output schemas
159164

160165
## Development
161166

sdks/fs-python/examples/Dockerfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ WORKDIR /function
55
COPY requirements.txt .
66
RUN pip install -r requirements.txt
77

8-
COPY string_function.py .
9-
COPY config.yaml .
10-
COPY package.yaml .
8+
COPY main.py .
119

12-
CMD ["python", "string_function.py"]
10+
RUN chmod +x main.py
11+
12+
CMD ["python", "main.py"]

sdks/fs-python/examples/string_function.py renamed to sdks/fs-python/examples/main.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
import asyncio
2121
from typing import Dict, Any
2222

23-
from fs_sdk import FSFunction
24-
from fs_sdk.context import FSContext
23+
from function_stream import FSFunction, FSContext
2524

2625
async def string_process_function(context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]:
2726
"""

sdks/fs-python/examples/test_string_function.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import pulsar
2121
import json
2222
import uuid
23-
import time
2423

2524
async def test_string_function():
2625
"""
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
from .function import FSFunction
2-
from .module import FSModule
31
from .config import Config, PulsarConfig, PulsarSourceConfig, SourceSpec, SinkSpec, Metric
42
from .context import FSContext
3+
from .function import FSFunction
54
from .metrics import Metrics, MetricsServer
5+
from .module import FSModule
66

77
__version__ = "0.6.0rc1"
88
__all__ = [
99
# Core classes
1010
"FSFunction",
1111
"FSModule",
12-
12+
1313
# Configuration classes
1414
"Config",
15-
"PulsarConfig",
15+
"PulsarConfig",
1616
"PulsarSourceConfig",
1717
"SourceSpec",
1818
"SinkSpec",
1919
"Metric",
20-
20+
2121
# Context and utilities
2222
"FSContext",
23-
23+
2424
# Metrics and monitoring
2525
"Metrics",
2626
"MetricsServer"
27-
]
27+
]

sdks/fs-python/function_stream/config.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import os
2-
import yaml
32
from typing import Dict, Any, Optional, List
3+
4+
import yaml
45
from pydantic import BaseModel, Field
56

7+
68
class PulsarConfig(BaseModel):
79
"""
810
Configuration for Pulsar connection settings.
@@ -12,16 +14,17 @@ class PulsarConfig(BaseModel):
1214
"""
1315
serviceUrl: str = "pulsar://localhost:6650"
1416
"""Pulsar service URL in format 'pulsar://host:port' or 'pulsar+ssl://host:port' for SSL"""
15-
17+
1618
authPlugin: str = ""
1719
"""Authentication plugin class name (e.g., 'org.apache.pulsar.client.impl.auth.AuthenticationTls')"""
18-
20+
1921
authParams: str = ""
2022
"""Authentication parameters in JSON format or key-value pairs"""
21-
23+
2224
max_concurrent_requests: int = 10
2325
"""Maximum number of concurrent requests allowed for this connection"""
2426

27+
2528
class PulsarSourceConfig(BaseModel):
2629
"""
2730
Configuration for Pulsar source/sink specific settings.
@@ -32,6 +35,7 @@ class PulsarSourceConfig(BaseModel):
3235
topic: str
3336
"""Pulsar topic name to consume from or produce to"""
3437

38+
3539
class SourceSpec(BaseModel):
3640
"""
3741
Specification for data sources.
@@ -42,6 +46,7 @@ class SourceSpec(BaseModel):
4246
pulsar: Optional[PulsarSourceConfig] = None
4347
"""Pulsar source configuration (optional)"""
4448

49+
4550
class SinkSpec(BaseModel):
4651
"""
4752
Specification for data sinks.
@@ -52,6 +57,7 @@ class SinkSpec(BaseModel):
5257
pulsar: Optional[PulsarSourceConfig] = None
5358
"""Pulsar sink configuration (optional)"""
5459

60+
5561
class Metric(BaseModel):
5662
"""
5763
Configuration for metrics and monitoring.
@@ -61,6 +67,7 @@ class Metric(BaseModel):
6167
port: Optional[int] = 9099
6268
"""Port number for metrics endpoint (default: 9099)"""
6369

70+
6471
class Config(BaseModel):
6572
"""
6673
Main configuration class for FunctionStream SDK.
@@ -70,31 +77,31 @@ class Config(BaseModel):
7077
"""
7178
name: Optional[str] = None
7279
"""Function name identifier (optional)"""
73-
80+
7481
description: Optional[str] = None
7582
"""Function description (optional)"""
76-
83+
7784
pulsar: PulsarConfig = Field(default_factory=PulsarConfig)
7885
"""Pulsar connection configuration"""
79-
86+
8087
module: str = "default"
8188
"""Module name for the function (default: 'default')"""
82-
89+
8390
sources: List[SourceSpec] = Field(default_factory=list)
8491
"""List of input data sources"""
85-
92+
8693
requestSource: Optional[SourceSpec] = None
8794
"""Request source configuration for request-response pattern (optional)"""
88-
95+
8996
sink: Optional[SinkSpec] = None
9097
"""Output sink configuration (optional)"""
91-
98+
9299
subscriptionName: str = "function-stream-sdk-subscription"
93100
"""Pulsar subscription name for consuming messages"""
94-
101+
95102
metric: Metric = Field(default_factory=Metric)
96103
"""Metrics and monitoring configuration"""
97-
104+
98105
config: Dict[str, Any] = Field(default_factory=dict)
99106
"""Custom configuration key-value pairs for function-specific settings"""
100107

@@ -118,7 +125,7 @@ def from_yaml(cls, config_path: str = "config.yaml") -> "Config":
118125
"""
119126
if not os.path.exists(config_path):
120127
raise FileNotFoundError(f"Configuration file not found: {config_path}")
121-
128+
122129
with open(config_path, 'r') as f:
123130
config_data = yaml.safe_load(f)
124131
return cls(**config_data)

sdks/fs-python/function_stream/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,4 @@ def get_configs(self) -> Dict[str, Any]:
5757
return self.config.config
5858

5959
def get_module(self) -> str:
60-
return self.config.module
60+
return self.config.module

sdks/fs-python/function_stream/function.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,25 @@
44
This module provides the core functionality for creating and managing FunctionStream
55
functions. It handles message processing, request/response flow, and resource management.
66
"""
7+
import asyncio
78
import dataclasses
8-
import datetime
9+
import functools
10+
import inspect
911
import json
10-
import asyncio
1112
import logging
12-
import pulsar
13+
import os
1314
import time
14-
import functools
15-
import inspect
15+
import typing
16+
from datetime import datetime, timezone
1617
from typing import Callable, Any, Dict, Set, Union, Awaitable, get_type_hints, List, Optional
18+
19+
import pulsar
1720
from pulsar import Client, Producer
21+
1822
from .config import Config
19-
from .metrics import Metrics, MetricsServer
2023
from .context import FSContext
24+
from .metrics import Metrics, MetricsServer
2125
from .module import FSModule
22-
import typing
23-
from datetime import datetime, timezone
2426

2527
# Configure logging
2628
logging.basicConfig(level=logging.INFO)
@@ -91,28 +93,28 @@ def is_none_type(annotation):
9193

9294
def is_awaitable_dict(annotation):
9395
ann = unwrap_annotated(annotation)
94-
origin = typing.get_origin(annotation)
96+
origin = typing.get_origin(ann)
9597
args = typing.get_args(ann)
9698
return origin in (typing.Awaitable,) and len(args) == 1 and is_dict_return(args[0])
9799

98100
def is_awaitable_none(annotation):
99101
ann = unwrap_annotated(annotation)
100-
origin = typing.get_origin(annotation)
102+
origin = typing.get_origin(ann)
101103
args = typing.get_args(ann)
102104
return origin in (typing.Awaitable,) and len(args) == 1 and is_none_type(args[0])
103105

104106
def is_union_of_dict_and_none(annotation):
105107
ann = unwrap_annotated(annotation)
106-
origin = typing.get_origin(annotation)
107-
args = typing.get_args(annotation)
108+
origin = typing.get_origin(ann)
109+
args = typing.get_args(ann)
108110
if origin in (typing.Union, Union):
109111
return (any(is_dict_return(arg) for arg in args) and any(is_none_type(arg) for arg in args))
110112
return False
111113

112114
def is_awaitable_union_dict_none(annotation):
113115
ann = unwrap_annotated(annotation)
114-
origin = typing.get_origin(annotation)
115-
args = typing.get_args(annotation)
116+
origin = typing.get_origin(ann)
117+
args = typing.get_args(ann)
116118
if origin in (typing.Awaitable,):
117119
if len(args) == 1:
118120
return is_union_of_dict_and_none(args[0])
@@ -158,8 +160,9 @@ class FSFunction:
158160
def __init__(
159161
self,
160162
process_funcs: Dict[
161-
str, Union[Callable[["FSContext", Dict[str, Any]], Union[Dict[str, Any], Awaitable[Dict[str, Any]]]], FSModule]],
162-
config_path: str = "config.yaml"
163+
str, Union[Callable[
164+
["FSContext", Dict[str, Any]], Union[Dict[str, Any], Awaitable[Dict[str, Any]]]], FSModule]],
165+
config_path: str = None
163166
):
164167
"""
165168
Initialize the FS Function.
@@ -169,13 +172,15 @@ def __init__(
169172
Each function must accept two parameters: (context: FSContext, data: Dict[str, Any])
170173
and return either a Dict[str, Any] or an Awaitable[Dict[str, Any]].
171174
Each module must be an instance of FSModule.
172-
config_path (str): Path to the configuration file. Defaults to "config.yaml".
175+
config_path (str): Path to the configuration file.
173176
174177
Raises:
175178
ValueError: If no module is specified in config or if the specified module
176179
doesn't have a corresponding process function, or if the function
177180
structure is invalid.
178181
"""
182+
if config_path is None:
183+
config_path = os.getenv("FS_CONFIG_PATH", "config.yaml")
179184
self.config = Config.from_yaml(config_path)
180185
self.process_funcs = process_funcs
181186
self.context = FSContext(self.config)
@@ -217,7 +222,6 @@ def __init__(
217222
self._tasks_lock = asyncio.Lock()
218223
self._consumer = None
219224

220-
221225
# Create multi-topics consumer
222226
self._setup_consumer()
223227

@@ -257,7 +261,7 @@ def _setup_consumer(self):
257261
topics,
258262
subscription_name,
259263
consumer_type=pulsar.ConsumerType.Shared,
260-
unacked_messages_timeout_ms=30_000 # Only for non-ordering guarantee workload
264+
unacked_messages_timeout_ms=30_000 # Only for non-ordering guarantee workload
261265
)
262266
logger.info(f"Created multi-topics consumer for topics: {topics} with subscription: {subscription_name}")
263267

@@ -425,10 +429,12 @@ async def _send_response(self, response_topic: str, request_id: str, msg: List[M
425429
loop = asyncio.get_event_loop()
426430
try:
427431
producer = self._get_producer(response_topic)
432+
428433
def default_serializer(o):
429434
if isinstance(o, datetime):
430435
return o.isoformat()
431436
return str(o)
437+
432438
send_futures = []
433439
for m in msg:
434440
future = loop.create_future()
@@ -442,6 +448,7 @@ def callback(res, msg_id):
442448
loop.call_soon_threadsafe(f.set_exception, err)
443449
else:
444450
loop.call_soon_threadsafe(f.set_result, msg_id)
451+
445452
return callback
446453

447454
event_timestamp = None

0 commit comments

Comments
 (0)