Skip to content

Commit 9caf11a

Browse files
committed
fix daemon session lifecycle and log handling
1 parent 81780fa commit 9caf11a

10 files changed

Lines changed: 715 additions & 177 deletions

File tree

src/cli/mod.rs

Lines changed: 71 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,37 @@ pub mod serve;
44
use crate::cli::client::{new_client, new_streaming_client, DEFAULT_ADDR};
55
use anyhow::{Context, Result};
66

7+
async fn read_json_response(resp: reqwest::Response) -> Result<serde_json::Value> {
8+
let status = resp.status();
9+
let body = resp.json::<serde_json::Value>().await?;
10+
if !status.is_success() {
11+
anyhow::bail!("{}", serde_json::to_string_pretty(&body)?);
12+
}
13+
Ok(body)
14+
}
15+
16+
async fn read_text_response(resp: reqwest::Response) -> Result<String> {
17+
let status = resp.status();
18+
let body = resp.text().await?;
19+
if !status.is_success() {
20+
anyhow::bail!("{body}");
21+
}
22+
Ok(body)
23+
}
24+
725
/// If `id` is Some, return it. If None and exactly one session is active, return its ID.
826
/// Otherwise error.
927
async fn resolve_id(id: Option<String>) -> Result<String> {
1028
if let Some(id) = id {
1129
return Ok(id);
1230
}
13-
let sessions = new_client()
14-
.get(format!("{DEFAULT_ADDR}/v1/sessions"))
15-
.send()
16-
.await?
17-
.json::<serde_json::Value>()
18-
.await?;
31+
let sessions = read_json_response(
32+
new_client()
33+
.get(format!("{DEFAULT_ADDR}/v1/sessions"))
34+
.send()
35+
.await?,
36+
)
37+
.await?;
1938
let arr = sessions["sessions"]
2039
.as_array()
2140
.ok_or_else(|| anyhow::anyhow!("unexpected response from daemon"))?;
@@ -46,66 +65,68 @@ pub fn load_watch_paths(mut watch: Vec<String>, watch_file: Option<String>) -> R
4665

4766
pub async fn cmd_ls() -> Result<()> {
4867
let client = new_client();
49-
let resp = client
50-
.get(format!("{DEFAULT_ADDR}/v1/sessions"))
51-
.send()
52-
.await?
53-
.json::<serde_json::Value>()
54-
.await?;
68+
let resp = read_json_response(
69+
client
70+
.get(format!("{DEFAULT_ADDR}/v1/sessions"))
71+
.send()
72+
.await?,
73+
)
74+
.await?;
5575
println!("{}", serde_json::to_string_pretty(&resp)?);
5676
Ok(())
5777
}
5878

5979
pub async fn cmd_inspect(id: Option<String>) -> Result<()> {
6080
let id = resolve_id(id).await?;
6181
let client = new_client();
62-
let resp = client
82+
let body = read_json_response(
83+
client
6384
.get(format!("{DEFAULT_ADDR}/v1/sessions/{id}"))
6485
.send()
65-
.await?;
66-
let status = resp.status();
67-
let body = resp.json::<serde_json::Value>().await?;
68-
if !status.is_success() {
69-
anyhow::bail!("{}", serde_json::to_string_pretty(&body)?);
70-
}
86+
.await?,
87+
)
88+
.await?;
7189
println!("{}", serde_json::to_string_pretty(&body)?);
7290
Ok(())
7391
}
7492

7593
pub async fn cmd_restart(id: Option<String>) -> Result<()> {
7694
let id = resolve_id(id).await?;
7795
let client = new_client();
78-
let resp = client
96+
let resp = read_json_response(
97+
client
7998
.post(format!("{DEFAULT_ADDR}/v1/sessions/{id}/restart"))
8099
.send()
81-
.await?
82-
.json::<serde_json::Value>()
83-
.await?;
100+
.await?,
101+
)
102+
.await?;
84103
println!("{}", serde_json::to_string_pretty(&resp)?);
85104
Ok(())
86105
}
87106

88107
pub async fn cmd_stop(id: Option<String>) -> Result<()> {
89108
let id = resolve_id(id).await?;
90109
let client = new_client();
91-
let resp = client
110+
let resp = read_json_response(
111+
client
92112
.post(format!("{DEFAULT_ADDR}/v1/sessions/{id}/stop"))
93113
.send()
94-
.await?
95-
.json::<serde_json::Value>()
96-
.await?;
114+
.await?,
115+
)
116+
.await?;
97117
println!("{}", serde_json::to_string_pretty(&resp)?);
98118
Ok(())
99119
}
100120

101121
pub async fn cmd_stop_all() -> Result<()> {
102122
let client = new_client();
103-
let sessions = client
104-
.get(format!("{DEFAULT_ADDR}/v1/sessions"))
105-
.send()
106-
.await?
107-
.json::<serde_json::Value>()
108-
.await?;
123+
let sessions = read_json_response(
124+
client
125+
.get(format!("{DEFAULT_ADDR}/v1/sessions"))
126+
.send()
127+
.await?,
128+
)
129+
.await?;
109130
let ids: Vec<String> = sessions["sessions"]
110131
.as_array()
111132
.map(|arr| {
@@ -119,12 +140,13 @@ pub async fn cmd_stop_all() -> Result<()> {
119140
return Ok(());
120141
}
121142
for id in &ids {
122-
let resp = client
143+
let resp = read_json_response(
144+
client
123145
.post(format!("{DEFAULT_ADDR}/v1/sessions/{id}/stop"))
124146
.send()
125-
.await?
126-
.json::<serde_json::Value>()
127-
.await?;
147+
.await?,
148+
)
149+
.await?;
128150
println!("{id}: {}", resp["state"].as_str().unwrap_or("?"));
129151
}
130152
Ok(())
@@ -142,18 +164,20 @@ pub async fn cmd_tail(id: Option<String>, follow: bool) -> Result<()> {
142164
.send()
143165
.await
144166
.context("connecting to daemon for log stream")?;
167+
resp.error_for_status_ref()?;
145168
while let Some(chunk) = resp.chunk().await.context("reading log stream")? {
146169
print!("{}", String::from_utf8_lossy(&chunk));
147170
}
148171
} else {
149-
let text = new_client()
172+
let text = read_text_response(
173+
new_client()
150174
.get(&url)
151175
.send()
152176
.await
153-
.context("GET tail")?
154-
.text()
155-
.await
156-
.context("reading tail response")?;
177+
.context("GET tail")?,
178+
)
179+
.await
180+
.context("reading tail response")?;
157181
print!("{text}");
158182
}
159183
Ok(())
@@ -162,12 +186,13 @@ pub async fn cmd_tail(id: Option<String>, follow: bool) -> Result<()> {
162186
pub async fn cmd_head(id: Option<String>) -> Result<()> {
163187
let id = resolve_id(id).await?;
164188
let client = new_client();
165-
let text = client
189+
let text = read_text_response(
190+
client
166191
.get(format!("{DEFAULT_ADDR}/v1/sessions/{id}/head?format=text"))
167192
.send()
168-
.await?
169-
.text()
170-
.await?;
193+
.await?,
194+
)
195+
.await?;
171196
print!("{text}");
172197
Ok(())
173198
}

src/daemon/buffer.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub struct LogEntry {
2121
pub struct LogBuffer {
2222
entries: VecDeque<LogEntry>,
2323
capacity: usize,
24-
seq_counter: u64, // private; exposed via peek_next_seq()
24+
next_seq: u64,
2525
pub dropped: u64,
2626
pub total_bytes: u64,
2727
}
@@ -31,15 +31,21 @@ impl LogBuffer {
3131
Self {
3232
entries: VecDeque::with_capacity(capacity.min(1024)),
3333
capacity,
34-
seq_counter: 0,
34+
next_seq: 0,
3535
dropped: 0,
3636
total_bytes: 0,
3737
}
3838
}
3939

40+
#[allow(dead_code)]
4041
pub fn push(&mut self, stream: Stream, line: String, ts: DateTime<Utc>) -> u64 {
41-
let seq = self.seq_counter;
42-
self.seq_counter += 1;
42+
let seq = self.next_seq;
43+
self.push_with_seq(seq, stream, line, ts);
44+
seq
45+
}
46+
47+
pub fn push_with_seq(&mut self, seq: u64, stream: Stream, line: String, ts: DateTime<Utc>) {
48+
self.next_seq = seq.saturating_add(1);
4349
if stream != Stream::System {
4450
self.total_bytes += line.len() as u64 + 1;
4551
}
@@ -53,7 +59,6 @@ impl LogBuffer {
5359
stream,
5460
line,
5561
});
56-
seq
5762
}
5863

5964
/// Returns last `limit` entries (newest).
@@ -86,8 +91,9 @@ impl LogBuffer {
8691
}
8792

8893
/// The seq number that the next push() call will assign.
94+
#[allow(dead_code)]
8995
pub fn peek_next_seq(&self) -> u64 {
90-
self.seq_counter
96+
self.next_seq
9197
}
9298
}
9399

0 commit comments

Comments
 (0)