diff --git a/graph/src/ipfs/client.rs b/graph/src/ipfs/client.rs index 90da991152a..65fb6e5ba52 100644 --- a/graph/src/ipfs/client.rs +++ b/graph/src/ipfs/client.rs @@ -14,6 +14,17 @@ use crate::ipfs::ContentPath; use crate::ipfs::IpfsError; use crate::ipfs::IpfsResult; use crate::ipfs::RetryPolicy; +use crate::util::futures::RetryConfigNoTimeout; + +fn retry_no_timeout<O: Send + Sync + 'static>( + retry_policy: RetryPolicy, + opname: &str, + logger: &Logger, + path: &ContentPath, +) -> RetryConfigNoTimeout<O, IpfsError> { + let logger = logger.new(slog::o!("path" => path.to_string())); + retry_policy.create(opname, &logger).no_timeout() +} /// A read-only connection to an IPFS server. #[async_trait] @@ -36,19 +47,16 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option<Duration>, retry_policy: RetryPolicy, ) -> IpfsResult<BoxStream<'static, IpfsResult<Bytes>>> { - let fut = retry_policy - .create("IPFS.cat_stream", self.logger()) - .no_timeout() - .run({ - let path = path.to_owned(); + let fut = retry_no_timeout(retry_policy, "IPFS.cat_stream", self.logger(), path).run({ + let path = path.to_owned(); - move || { - let path = path.clone(); - let client = self.clone(); + move || { + let path = path.clone(); + let client = self.clone(); - async move { client.call(IpfsRequest::Cat(path)).await } - } - }); + async move { client.call(IpfsRequest::Cat(path)).await } + } + }); let resp = run_with_optional_timeout(path, fut, timeout).await?; @@ -66,25 +74,22 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option<Duration>, retry_policy: RetryPolicy, ) -> IpfsResult<Bytes> { - let fut = retry_policy - .create("IPFS.cat", self.logger()) - .no_timeout() - .run({ - let path = path.to_owned(); - - move || { - let path = path.clone(); - let client = self.clone(); - - async move { - client - .call(IpfsRequest::Cat(path)) - .await? - .bytes(Some(max_size)) - .await - } + let fut = retry_no_timeout(retry_policy, "IPFS.cat", self.logger(), path).run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::Cat(path)) + .await? + .bytes(Some(max_size)) + .await } - }); + } + }); run_with_optional_timeout(path, fut, timeout).await } @@ -99,25 +104,22 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option<Duration>, retry_policy: RetryPolicy, ) -> IpfsResult<Bytes> { - let fut = retry_policy - .create("IPFS.get_block", self.logger()) - .no_timeout() - .run({ - let path = path.to_owned(); - - move || { - let path = path.clone(); - let client = self.clone(); - - async move { - client - .call(IpfsRequest::GetBlock(path)) - .await? - .bytes(None) - .await - } + let fut = retry_no_timeout(retry_policy, "IPFS.get_block", self.logger(), path).run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::GetBlock(path)) + .await? + .bytes(None) + .await } - }); + } + }); run_with_optional_timeout(path, fut, timeout).await }