|
|
@@ -509,20 +509,24 @@ func (r *remoteWriteClient) write(ctx context.Context, ts []prompb.TimeSeries) e
|
|
|
|
|
|
func remoteWriteTimeSeries(client *remoteWriteClient, tsCh chan []prompb.TimeSeries) {
|
|
|
bo := backoff.NewBackoff("remote-write", log.Printf, time.Second*30)
|
|
|
+ // writeErr may contribute to bo's backoff schedule across tsCh read ops,
|
|
|
+ // i.e. if an unrecoverable error occurs for client.write(ctx, A), that
|
|
|
+ // should be accounted against bo prior to attempting to
|
|
|
+ // client.write(ctx, B).
|
|
|
+ var writeErr error
|
|
|
for ts := range tsCh {
|
|
|
for {
|
|
|
+ bo.BackOff(context.Background(), writeErr)
|
|
|
reqCtx, cancel := context.WithTimeout(context.Background(), time.Second*30)
|
|
|
- err := client.write(reqCtx, ts)
|
|
|
+ writeErr = client.write(reqCtx, ts)
|
|
|
cancel()
|
|
|
- // we could parse the Retry-After header, but use a simple exp
|
|
|
- // backoff for now
|
|
|
- bo.BackOff(context.Background(), err)
|
|
|
- if err == nil {
|
|
|
- break
|
|
|
- }
|
|
|
var re recoverableErr
|
|
|
- if !errors.Is(err, &re) {
|
|
|
- log.Printf("unrecoverable remote write error: %v", err)
|
|
|
+ recoverable := errors.As(writeErr, &re)
|
|
|
+ if writeErr != nil {
|
|
|
+ log.Printf("remote write error(recoverable=%v): %v", recoverable, writeErr)
|
|
|
+ }
|
|
|
+ if !recoverable {
|
|
|
+ // a nil err is not recoverable
|
|
|
break
|
|
|
}
|
|
|
}
|