From 53e8e39c5ebfd40416345d4d28f2b0e4a745ebf5 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 26 May 2025 13:37:25 -0700 Subject: [PATCH] graph: Log the path we are accessing in IPFS errors --- graph/src/ipfs/client.rs | 96 ++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 47 deletions(-) diff --git a/graph/src/ipfs/client.rs b/graph/src/ipfs/client.rs index 90da991152a..65fb6e5ba52 100644 --- a/graph/src/ipfs/client.rs +++ b/graph/src/ipfs/client.rs @@ -14,6 +14,17 @@ use crate::ipfs::ContentPath; use crate::ipfs::IpfsError; use crate::ipfs::IpfsResult; use crate::ipfs::RetryPolicy; +use crate::util::futures::RetryConfigNoTimeout; + +fn retry_no_timeout( + retry_policy: RetryPolicy, + opname: &str, + logger: &Logger, + path: &ContentPath, +) -> RetryConfigNoTimeout { + let logger = logger.new(slog::o!("path" => path.to_string())); + retry_policy.create(opname, &logger).no_timeout() +} /// A read-only connection to an IPFS server. #[async_trait] @@ -36,19 +47,16 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult>> { - let fut = retry_policy - .create("IPFS.cat_stream", self.logger()) - .no_timeout() - .run({ - let path = path.to_owned(); + let fut = retry_no_timeout(retry_policy, "IPFS.cat_stream", self.logger(), path).run({ + let path = path.to_owned(); - move || { - let path = path.clone(); - let client = self.clone(); + move || { + let path = path.clone(); + let client = self.clone(); - async move { client.call(IpfsRequest::Cat(path)).await } - } - }); + async move { client.call(IpfsRequest::Cat(path)).await } + } + }); let resp = run_with_optional_timeout(path, fut, timeout).await?; @@ -66,25 +74,22 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - let fut = retry_policy - .create("IPFS.cat", self.logger()) - .no_timeout() - .run({ - let path = path.to_owned(); - - move || { - let path = path.clone(); - let client = self.clone(); - - async move { - client - .call(IpfsRequest::Cat(path)) - .await? - .bytes(Some(max_size)) - .await - } + let fut = retry_no_timeout(retry_policy, "IPFS.cat", self.logger(), path).run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::Cat(path)) + .await? + .bytes(Some(max_size)) + .await } - }); + } + }); run_with_optional_timeout(path, fut, timeout).await } @@ -99,25 +104,22 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - let fut = retry_policy - .create("IPFS.get_block", self.logger()) - .no_timeout() - .run({ - let path = path.to_owned(); - - move || { - let path = path.clone(); - let client = self.clone(); - - async move { - client - .call(IpfsRequest::GetBlock(path)) - .await? - .bytes(None) - .await - } + let fut = retry_no_timeout(retry_policy, "IPFS.get_block", self.logger(), path).run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::GetBlock(path)) + .await? + .bytes(None) + .await } - }); + } + }); run_with_optional_timeout(path, fut, timeout).await }