Skip to content

Commit 1ce6826

Browse files
feat: Implement request-response pattern for mqtt 5 (#19)
1 parent 705adca commit 1ce6826

File tree

12 files changed

+398
-71
lines changed

12 files changed

+398
-71
lines changed

docs/advanced/mqtt5.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,20 @@ async with client.subscribe(
8888
!!! warning
8989
`no_local` and `retain_as_published` raise `RuntimeError` when used on a `version="3.1.1"` connection.
9090

91+
## Request / response (`client.request()`)
92+
93+
Send a request and await exactly one reply in a single call:
94+
95+
```python
96+
reply = await client.request("services/echo", b"hello", timeout=5.0)
97+
print(reply.payload) # b"hello"
98+
```
99+
100+
zmqtt manages the reply topic subscription, the `response_topic` /
101+
`correlation_data` PUBLISH properties, and cleanup on timeout or
102+
cancellation automatically. See [Request / Response](request-response.md)
103+
for the full API and responder example.
104+
91105
## Enhanced authentication (`client.auth()`)
92106

93107
Send an AUTH packet for extended authentication flows (e.g. SCRAM, Kerberos):

docs/advanced/request-response.md

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Request / Response
2+
3+
MQTT 5.0 defines a first-class request/response pattern via two
4+
`PUBLISH` properties: `response_topic` and `correlation_data`. zmqtt
5+
implements this as a single `await client.request(…)` call.
6+
7+
## Basic usage
8+
9+
```python
10+
from zmqtt import create_client
11+
12+
async with create_client("broker", version="5.0") as client:
13+
reply = await client.request("services/calculator", b"2+2")
14+
print(reply.payload)
15+
```
16+
17+
`request()` handles the full flow automatically:
18+
19+
1. Subscribes to a unique reply topic before publishing.
20+
2. Publishes the request with `response_topic` and `correlation_data` set.
21+
3. Waits for the first message on the reply topic and returns it.
22+
4. Unsubscribes on return, timeout, or cancellation.
23+
24+
## Customising via `PublishProperties`
25+
26+
Pass a `PublishProperties` instance to control any field of the outgoing
27+
PUBLISH. Two fields receive special treatment:
28+
29+
| Field | Behaviour |
30+
| ------------------ | -------------------------------------------------------------------------------------- |
31+
| `response_topic` | Used as the reply topic instead of the auto-generated one. Must not contain wildcards. |
32+
| `correlation_data` | Forwarded to the responder as-is. Auto-generated (16 random bytes) when absent. |
33+
34+
All other fields (`content_type`, `message_expiry_interval`,
35+
`user_properties`, …) are forwarded unchanged.
36+
37+
```python
38+
from zmqtt import PublishProperties
39+
40+
reply = await client.request(
41+
"services/translate",
42+
b"hello",
43+
properties=PublishProperties(
44+
content_type="text/plain",
45+
response_topic="my-app/replies/translate",
46+
correlation_data=b"req-001",
47+
),
48+
timeout=10.0,
49+
)
50+
```
51+
52+
## Implementing a responder
53+
54+
The responder reads `response_topic` and `correlation_data` from the
55+
incoming message and publishes the reply there:
56+
57+
```python
58+
async with client.subscribe("services/translate") as sub:
59+
async for msg in sub:
60+
assert msg.properties is not None
61+
result = translate(msg.payload)
62+
await client.publish(
63+
msg.properties.response_topic,
64+
result,
65+
properties=PublishProperties(
66+
correlation_data=msg.properties.correlation_data,
67+
),
68+
)
69+
```
70+
71+
## Timeout
72+
73+
`request()` raises `asyncio.TimeoutError` when no reply arrives within
74+
`timeout` seconds (default `30.0`). The reply subscription is always
75+
cleaned up, even on timeout or cancellation.
76+
77+
```python
78+
import asyncio
79+
80+
try:
81+
reply = await client.request("slow/service", b"ping", timeout=5.0)
82+
except asyncio.TimeoutError:
83+
print("Service did not respond in time")
84+
```
85+
86+
## Errors
87+
88+
| Exception | Raised when |
89+
| ----------------------- | ------------------------------------------------- |
90+
| `RuntimeError` | `request()` is called on an MQTT 3.1.1 connection |
91+
| `MQTTInvalidTopicError` | `properties.response_topic` contains wildcards |
92+
| `MQTTDisconnectedError` | Connection is lost while waiting for the reply |
93+
| `asyncio.TimeoutError` | No reply arrives within `timeout` seconds |
94+
95+
!!! note
96+
`request()` is only available on MQTT 5.0 connections. Use
97+
`create_client(…, version="5.0")` or `MQTTClient(…, version="5.0")`.
98+
99+
---
100+
101+
**See also:** [MQTT 5.0](mqtt5.md) · [Publishing](../publishing.md) · [Error Handling](../error-handling.md)

docs/api-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,7 @@
9696
::: zmqtt.MQTTTimeoutError
9797
options:
9898
show_source: false
99+
100+
::: zmqtt.MQTTInvalidTopicError
101+
options:
102+
show_source: false

docs/error-handling.md

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ MQTTError
77
├── MQTTConnectError # CONNACK refused (return_code attribute)
88
├── MQTTProtocolError # malformed or unexpected packet
99
├── MQTTDisconnectedError # connection lost unexpectedly
10-
└── MQTTTimeoutError # ping or operation timed out
10+
├── MQTTTimeoutError # ping or operation timed out
11+
└── MQTTInvalidTopicError # topic string failed MQTT validation
1112
```
1213

1314
All exceptions are importable from `zmqtt`:
@@ -19,6 +20,7 @@ from zmqtt import (
1920
MQTTProtocolError,
2021
MQTTDisconnectedError,
2122
MQTTTimeoutError,
23+
MQTTInvalidTopicError,
2224
)
2325
```
2426

@@ -71,6 +73,62 @@ except MQTTTimeoutError:
7173

7274
See [Manual Ping](advanced/ping.md) for the full `ping()` API.
7375

76+
### `MQTTInvalidTopicError`
77+
78+
Raised when a topic string fails MQTT validation. The check happens eagerly —
79+
before any I/O — in `publish()`, `subscribe()`, and `request()`.
80+
81+
**`publish()` — topic name rules:**
82+
83+
- Must not be empty.
84+
- Must not contain `+` or `#` (wildcards are for filters only).
85+
- `$` is only valid as the very first character.
86+
87+
```python
88+
from zmqtt import MQTTInvalidTopicError
89+
90+
try:
91+
await client.publish("sensors/+/temp", b"22.5")
92+
except MQTTInvalidTopicError as e:
93+
print(e) # Wildcards not allowed in publish topic: 'sensors/+/temp'
94+
```
95+
96+
**`subscribe()` — topic filter rules:**
97+
98+
- Must not be empty.
99+
- `#` must be the last character and, if not the only character, must be
100+
preceded by `/`.
101+
- `+` must occupy an entire level (e.g. `a/+/b` is valid; `a/temp+/b` is not).
102+
- `$` is only valid as the very first character.
103+
104+
```python
105+
try:
106+
client.subscribe("sensors#") # missing preceding '/'
107+
client.subscribe("sensors/temp+/data") # '+' not a full level
108+
except MQTTInvalidTopicError as e:
109+
print(e)
110+
```
111+
112+
**`request()` — response topic rules:**
113+
114+
The `response_topic` property follows the same rules as a publish topic (no
115+
wildcards):
116+
117+
```python
118+
from zmqtt import MQTTInvalidTopicError, PublishProperties
119+
120+
try:
121+
await client.request(
122+
"cmd",
123+
b"x",
124+
properties=PublishProperties(response_topic="reply/+/bad"),
125+
)
126+
except MQTTInvalidTopicError as e:
127+
print(e)
128+
```
129+
130+
See [Request / Response](advanced/request-response.md) for details.
131+
74132
## Reconnection interaction
75133

76134
When `ReconnectConfig(enabled=True)` (the default), `MQTTDisconnectedError` and `MQTTTimeoutError` inside the protocol loop are suppressed and the client reconnects with exponential backoff. Your `async for msg in sub` loop keeps waiting — it will resume delivering messages once the connection is restored.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ nav:
3333
- Reconnection: advanced/reconnection.md
3434
- Ping: advanced/ping.md
3535
- MQTT 5.0: advanced/mqtt5.md
36+
- Request / Response: advanced/request-response.md
3637
- Backpressure: advanced/backpressure.md
3738
- Scaling: advanced/scaling.md
3839
- Error Handling: error-handling.md

src/zmqtt/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
MQTTConnectError,
1515
MQTTDisconnectedError,
1616
MQTTError,
17+
MQTTInvalidTopicError,
1718
MQTTProtocolError,
1819
MQTTTimeoutError,
1920
)
@@ -27,6 +28,7 @@
2728
"MQTTConnectError",
2829
"MQTTDisconnectedError",
2930
"MQTTError",
31+
"MQTTInvalidTopicError",
3032
"MQTTProtocolError",
3133
"MQTTTimeoutError",
3234
"Message",

src/zmqtt/_internal/types/topic.py

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,38 @@
1-
from typing_extensions import Self
1+
from zmqtt.errors import MQTTInvalidTopicError
22

33

4-
def _validate_topic_name(topic: str) -> None:
4+
def validate_publish(topic: str) -> None:
55
if not topic:
66
msg = "Topic must not be empty"
7-
raise ValueError(msg)
7+
raise MQTTInvalidTopicError(msg)
88
if "#" in topic or "+" in topic:
99
msg = f"Wildcards not allowed in publish topic: {topic!r}"
10-
raise ValueError(msg)
10+
raise MQTTInvalidTopicError(msg)
1111
if "$" in topic[1:]:
1212
msg = f"'$' is only valid as the first character of a topic: {topic!r}"
13-
raise ValueError(
14-
msg,
15-
)
13+
raise MQTTInvalidTopicError(msg)
1614

1715

18-
def _validate_topic_filter(topic: str) -> None:
16+
def validate_subscribe_topic(topic: str) -> None:
1917
if not topic:
2018
msg = "Topic filter must not be empty"
21-
raise ValueError(msg)
19+
raise MQTTInvalidTopicError(msg)
2220
if "$" in topic[1:]:
2321
msg = f"'$' is only valid as the first character of a topic filter: {topic!r}"
24-
raise ValueError(
25-
msg,
26-
)
22+
raise MQTTInvalidTopicError(msg)
2723
if "#" in topic:
2824
idx = topic.index("#")
2925
if idx != len(topic) - 1:
3026
msg = f"'#' must be the last character in a topic filter: {topic!r}"
31-
raise ValueError(
32-
msg,
33-
)
27+
raise MQTTInvalidTopicError(msg)
3428
if idx > 0 and topic[idx - 1] != "/":
3529
msg = f"'#' must be preceded by '/' in a topic filter: {topic!r}"
36-
raise ValueError(
37-
msg,
38-
)
30+
raise MQTTInvalidTopicError(msg)
3931
for level in topic.split("/"):
4032
if "+" in level and level != "+":
4133
msg = f"'+' must occupy an entire topic level in filter: {topic!r}"
42-
raise ValueError(
43-
msg,
44-
)
34+
raise MQTTInvalidTopicError(msg)
4535

4636

47-
class Topic(str):
48-
"""Validated publish topic — no wildcards, '$' only as first character."""
49-
50-
__slots__ = ()
51-
52-
def __new__(cls, value: str) -> Self:
53-
_validate_topic_name(value)
54-
return super().__new__(cls, value)
55-
56-
57-
class TopicFilter(str):
58-
"""Validated subscription filter. Wildcards allowed, '$' only as first character."""
59-
60-
__slots__ = ()
61-
62-
def __new__(cls, value: str) -> Self:
63-
_validate_topic_filter(value)
64-
return super().__new__(cls, value)
37+
def validate_response_topic(topic: str) -> None:
38+
validate_publish(topic)

0 commit comments

Comments
 (0)