Skip to content

Commit 23abb5a

Browse files
pree-dewdmathieu
andauthored
Parse errormsgs in retryable status codes (#5541)
Fixes #5536 --------- Co-authored-by: Damien Mathieu <[email protected]>
1 parent 30cc379 commit 23abb5a

File tree

7 files changed

+190
-20
lines changed

7 files changed

+190
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
2222
- Correct the `Tracer` names used in `go.opentelemetry.io/otel/example/passthrough`. (#5612)
2323
- Correct the `Meter` name used in `go.opentelemetry.io/otel/example/prometheus`. (#5612)
2424
- Correct the `Tracer` names used in `go.opentelemetry.io/otel/example/zipkin`. (#5612)
25+
- Pass the underlying error rather than a generic retry-able failure in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp` and `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5541)
2526

2627
<!-- Released section -->
2728
<!-- Don't change this section unless doing release -->

exporters/otlp/otlplog/otlploghttp/client.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/http"
1515
"net/url"
1616
"strconv"
17+
"strings"
1718
"sync"
1819
"time"
1920

@@ -143,7 +144,7 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
143144
resp, err := c.client.Do(request.Request)
144145
var urlErr *url.Error
145146
if errors.As(err, &urlErr) && urlErr.Temporary() {
146-
return newResponseError(http.Header{})
147+
return newResponseError(http.Header{}, err)
147148
}
148149
if err != nil {
149150
return err
@@ -184,13 +185,25 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
184185
sc == http.StatusServiceUnavailable,
185186
sc == http.StatusGatewayTimeout:
186187
// Retry-able failure.
187-
rErr = newResponseError(resp.Header)
188+
rErr = newResponseError(resp.Header, nil)
188189

189-
// Going to retry, drain the body to reuse the connection.
190-
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
190+
// server may return a message with the response
191+
// body, so we read it to include in the error
192+
// message to be returned. It will help in
193+
// debugging the actual issue.
194+
var respData bytes.Buffer
195+
if _, err := io.Copy(&respData, resp.Body); err != nil {
191196
_ = resp.Body.Close()
192197
return err
193198
}
199+
200+
// overwrite the error message with the response body
201+
// if it is not empty
202+
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
203+
// Include response for context.
204+
e := errors.New(respStr)
205+
rErr = newResponseError(resp.Header, e)
206+
}
194207
default:
195208
rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status)
196209
}
@@ -266,24 +279,50 @@ func (r *request) reset(ctx context.Context) {
266279
// retryableError represents a request failure that can be retried.
267280
type retryableError struct {
268281
throttle int64
282+
err error
269283
}
270284

271285
// newResponseError returns a retryableError and will extract any explicit
272-
// throttle delay contained in headers.
273-
func newResponseError(header http.Header) error {
286+
// throttle delay contained in headers. The returned error wraps wrapped
287+
// if it is not nil.
288+
func newResponseError(header http.Header, wrapped error) error {
274289
var rErr retryableError
275290
if v := header.Get("Retry-After"); v != "" {
276291
if t, err := strconv.ParseInt(v, 10, 64); err == nil {
277292
rErr.throttle = t
278293
}
279294
}
295+
296+
rErr.err = wrapped
280297
return rErr
281298
}
282299

283300
func (e retryableError) Error() string {
301+
if e.err != nil {
302+
return fmt.Sprintf("retry-able request failure: %v", e.err.Error())
303+
}
304+
284305
return "retry-able request failure"
285306
}
286307

308+
func (e retryableError) Unwrap() error {
309+
return e.err
310+
}
311+
312+
func (e retryableError) As(target interface{}) bool {
313+
if e.err == nil {
314+
return false
315+
}
316+
317+
switch v := target.(type) {
318+
case **retryableError:
319+
*v = &e
320+
return true
321+
default:
322+
return false
323+
}
324+
}
325+
287326
// evaluate returns if err is retry-able. If it is and it includes an explicit
288327
// throttling delay, that delay is also returned.
289328
func evaluate(err error) (bool, time.Duration) {

exporters/otlp/otlplog/otlploghttp/client_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,30 @@ func TestConfig(t *testing.T) {
700700
assert.Len(t, rCh, 0, "failed HTTP responses did not occur")
701701
})
702702

703+
t.Run("WithRetryAndExporterErr", func(t *testing.T) {
704+
exporterErr := errors.New("rpc error: code = Unavailable desc = service.name not found in resource attributes")
705+
rCh := make(chan exportResult, 1)
706+
rCh <- exportResult{Err: &httpResponseError{
707+
Status: http.StatusTooManyRequests,
708+
Err: exporterErr,
709+
}}
710+
exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{
711+
Enabled: false,
712+
}))
713+
ctx := context.Background()
714+
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
715+
// Push this after Shutdown so the HTTP server doesn't hang.
716+
t.Cleanup(func() { close(rCh) })
717+
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
718+
err := exp.Export(ctx, make([]log.Record, 1))
719+
assert.ErrorContains(t, err, exporterErr.Error())
720+
721+
// To test the `Unwrap` and `As` function of retryable error
722+
var retryErr *retryableError
723+
assert.ErrorAs(t, err, &retryErr)
724+
assert.ErrorIs(t, err, *retryErr)
725+
})
726+
703727
t.Run("WithURLPath", func(t *testing.T) {
704728
path := "/prefix/v2/logs"
705729
ePt := fmt.Sprintf("http://localhost:0%s", path)

exporters/otlp/otlpmetric/otlpmetrichttp/client.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/http"
1515
"net/url"
1616
"strconv"
17+
"strings"
1718
"sync"
1819
"time"
1920

@@ -146,7 +147,7 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
146147
resp, err := c.httpClient.Do(request.Request)
147148
var urlErr *url.Error
148149
if errors.As(err, &urlErr) && urlErr.Temporary() {
149-
return newResponseError(http.Header{})
150+
return newResponseError(http.Header{}, err)
150151
}
151152
if err != nil {
152153
return err
@@ -187,13 +188,25 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
187188
sc == http.StatusServiceUnavailable,
188189
sc == http.StatusGatewayTimeout:
189190
// Retry-able failure.
190-
rErr = newResponseError(resp.Header)
191+
rErr = newResponseError(resp.Header, nil)
191192

192-
// Going to retry, drain the body to reuse the connection.
193-
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
193+
// server may return a message with the response
194+
// body, so we read it to include in the error
195+
// message to be returned. It will help in
196+
// debugging the actual issue.
197+
var respData bytes.Buffer
198+
if _, err := io.Copy(&respData, resp.Body); err != nil {
194199
_ = resp.Body.Close()
195200
return err
196201
}
202+
203+
// overwrite the error message with the response body
204+
// if it is not empty
205+
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
206+
// Include response for context.
207+
e := errors.New(respStr)
208+
rErr = newResponseError(resp.Header, e)
209+
}
197210
default:
198211
rErr = fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status)
199212
}
@@ -269,24 +282,50 @@ func (r *request) reset(ctx context.Context) {
269282
// retryableError represents a request failure that can be retried.
270283
type retryableError struct {
271284
throttle int64
285+
err error
272286
}
273287

274288
// newResponseError returns a retryableError and will extract any explicit
275-
// throttle delay contained in headers.
276-
func newResponseError(header http.Header) error {
289+
// throttle delay contained in headers. The returned error wraps wrapped
290+
// if it is not nil.
291+
func newResponseError(header http.Header, wrapped error) error {
277292
var rErr retryableError
278293
if v := header.Get("Retry-After"); v != "" {
279294
if t, err := strconv.ParseInt(v, 10, 64); err == nil {
280295
rErr.throttle = t
281296
}
282297
}
298+
299+
rErr.err = wrapped
283300
return rErr
284301
}
285302

286303
func (e retryableError) Error() string {
304+
if e.err != nil {
305+
return fmt.Sprintf("retry-able request failure: %s", e.err.Error())
306+
}
307+
287308
return "retry-able request failure"
288309
}
289310

311+
func (e retryableError) Unwrap() error {
312+
return e.err
313+
}
314+
315+
func (e retryableError) As(target interface{}) bool {
316+
if e.err == nil {
317+
return false
318+
}
319+
320+
switch v := target.(type) {
321+
case **retryableError:
322+
*v = &e
323+
return true
324+
default:
325+
return false
326+
}
327+
}
328+
290329
// evaluate returns if err is retry-able. If it is and it includes an explicit
291330
// throttling delay, that delay is also returned.
292331
func evaluate(err error) (bool, time.Duration) {

exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,30 @@ func TestConfig(t *testing.T) {
192192
assert.Len(t, rCh, 0, "failed HTTP responses did not occur")
193193
})
194194

195+
t.Run("WithRetryAndExporterErr", func(t *testing.T) {
196+
exporterErr := errors.New("rpc error: code = Unavailable desc = service.name not found in resource attributes")
197+
rCh := make(chan otest.ExportResult, 1)
198+
rCh <- otest.ExportResult{Err: &otest.HTTPResponseError{
199+
Status: http.StatusTooManyRequests,
200+
Err: exporterErr,
201+
}}
202+
exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{
203+
Enabled: false,
204+
}))
205+
ctx := context.Background()
206+
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
207+
// Push this after Shutdown so the HTTP server doesn't hang.
208+
t.Cleanup(func() { close(rCh) })
209+
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
210+
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
211+
assert.ErrorContains(t, err, exporterErr.Error())
212+
213+
// To test the `Unwrap` and `As` function of retryable error
214+
var retryErr *retryableError
215+
assert.ErrorAs(t, err, &retryErr)
216+
assert.ErrorIs(t, err, *retryErr)
217+
})
218+
195219
t.Run("WithURLPath", func(t *testing.T) {
196220
path := "/prefix/v2/metrics"
197221
ePt := fmt.Sprintf("http://localhost:0%s", path)

exporters/otlp/otlptrace/otlptracehttp/client.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/http"
1515
"net/url"
1616
"strconv"
17+
"strings"
1718
"sync"
1819
"time"
1920

@@ -151,7 +152,7 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
151152
resp, err := d.client.Do(request.Request)
152153
var urlErr *url.Error
153154
if errors.As(err, &urlErr) && urlErr.Temporary() {
154-
return newResponseError(http.Header{})
155+
return newResponseError(http.Header{}, err)
155156
}
156157
if err != nil {
157158
return err
@@ -198,11 +199,27 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
198199
sc == http.StatusBadGateway,
199200
sc == http.StatusServiceUnavailable,
200201
sc == http.StatusGatewayTimeout:
201-
// Retry-able failures. Drain the body to reuse the connection.
202-
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
203-
otel.Handle(err)
202+
// Retry-able failures.
203+
rErr := newResponseError(resp.Header, nil)
204+
205+
// server may return a message with the response
206+
// body, so we read it to include in the error
207+
// message to be returned. It will help in
208+
// debugging the actual issue.
209+
var respData bytes.Buffer
210+
if _, err := io.Copy(&respData, resp.Body); err != nil {
211+
_ = resp.Body.Close()
212+
return err
204213
}
205-
return newResponseError(resp.Header)
214+
215+
// overwrite the error message with the response body
216+
// if it is not empty
217+
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
218+
// Include response for context.
219+
e := errors.New(respStr)
220+
rErr = newResponseError(resp.Header, e)
221+
}
222+
return rErr
206223
default:
207224
return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
208225
}
@@ -291,24 +308,50 @@ func (r *request) reset(ctx context.Context) {
291308
// retryableError represents a request failure that can be retried.
292309
type retryableError struct {
293310
throttle int64
311+
err error
294312
}
295313

296314
// newResponseError returns a retryableError and will extract any explicit
297-
// throttle delay contained in headers.
298-
func newResponseError(header http.Header) error {
315+
// throttle delay contained in headers. The returned error wraps wrapped
316+
// if it is not nil.
317+
func newResponseError(header http.Header, wrapped error) error {
299318
var rErr retryableError
300319
if s, ok := header["Retry-After"]; ok {
301320
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil {
302321
rErr.throttle = t
303322
}
304323
}
324+
325+
rErr.err = wrapped
305326
return rErr
306327
}
307328

308329
func (e retryableError) Error() string {
330+
if e.err != nil {
331+
return fmt.Sprintf("retry-able request failure: %s", e.err.Error())
332+
}
333+
309334
return "retry-able request failure"
310335
}
311336

337+
func (e retryableError) Unwrap() error {
338+
return e.err
339+
}
340+
341+
func (e retryableError) As(target interface{}) bool {
342+
if e.err == nil {
343+
return false
344+
}
345+
346+
switch v := target.(type) {
347+
case **retryableError:
348+
*v = &e
349+
return true
350+
default:
351+
return false
352+
}
353+
}
354+
312355
// evaluate returns if err is retry-able. If it is and it includes an explicit
313356
// throttling delay, that delay is also returned.
314357
func evaluate(err error) (bool, time.Duration) {

exporters/otlp/otlptrace/otlptracehttp/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func TestTimeout(t *testing.T) {
238238
assert.NoError(t, exporter.Shutdown(ctx))
239239
}()
240240
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
241-
assert.ErrorContains(t, err, "retry-able request failure")
241+
assert.ErrorContains(t, err, "context deadline exceeded")
242242
}
243243

244244
func TestNoRetry(t *testing.T) {

0 commit comments

Comments
 (0)