-
Notifications
You must be signed in to change notification settings - Fork 693
Expand file tree
/
Copy pathazure_helpers.go
More file actions
167 lines (139 loc) · 4.65 KB
/
azure_helpers.go
File metadata and controls
167 lines (139 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package azure
import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/cristalhq/hedgedhttp"
"github.com/grafana/tempo/tempodb/backend/instrumentation"
)
const (
maxRetries = 1
)
func getContainerClient(ctx context.Context, cfg *Config, hedge bool) (*container.Client, error) {
var err error
retry := policy.RetryOptions{
MaxRetries: maxRetries,
// The values for TryTimeout, RetryDelay and MaxRetryDelay are inherited from the old Azure SDK
// (azure-storage-blob-go).
//
// See https://github.com/Azure/azure-storage-blob-go/blob/905b628ceb292e8d769ae62fb7cc5c5e949360db/azblob/zc_policy_retry.go#L89.
TryTimeout: 1 * time.Minute,
RetryDelay: 4 * time.Second,
MaxRetryDelay: 120 * time.Second,
}
if deadline, ok := ctx.Deadline(); ok {
retry.TryTimeout = time.Until(deadline)
}
customTransport := http.DefaultTransport.(*http.Transport).Clone()
// Default MaxIdleConnsPerHost is 2, increase that to reduce connection turnover
customTransport.MaxIdleConnsPerHost = 100
// set total max idle connections to a high number
customTransport.MaxIdleConns = 100
// add instrumentation
transport := instrumentation.NewTransport(customTransport)
var stats *hedgedhttp.Stats
// hedge if desired (0 means disabled)
if hedge && cfg.HedgeRequestsAt != 0 {
transport, stats, err = hedgedhttp.NewRoundTripperAndStats(cfg.HedgeRequestsAt, cfg.HedgeRequestsUpTo, transport)
if err != nil {
return nil, err
}
instrumentation.PublishHedgedMetrics(stats)
}
opts := azblob.ClientOptions{}
opts.Transport = &http.Client{Transport: transport}
opts.Retry = retry
opts.Telemetry = policy.TelemetryOptions{
ApplicationID: "Tempo",
}
accountName := getStorageAccountName(cfg)
u, err := url.Parse(fmt.Sprintf("https://%s.%s", accountName, cfg.Endpoint))
// If the endpoint doesn't start with blob. we can assume Azurite is being used
// So the endpoint should follow Azurite URL style
// https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob#emulated-storage-service-uri
if !strings.HasPrefix(cfg.Endpoint, "blob.") {
u, err = url.Parse(fmt.Sprintf("http://%s/%s", cfg.Endpoint, accountName))
}
if err != nil {
return nil, err
}
var client *azblob.Client
switch {
case cfg.UseFederatedToken:
credential, err := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{})
if err != nil {
return nil, err
}
client, err = azblob.NewClient(u.String(), credential, &opts)
if err != nil {
return nil, err
}
case cfg.UseManagedIdentity:
var id azidentity.ManagedIDKind
if cfg.UserAssignedID != "" {
id = azidentity.ClientID(cfg.UserAssignedID)
}
// azidentity.NewManagedIdentityCredential defaults to a system-assigned identity.
// We only set options.ID if we want a user-assigned identity.
// See azidentity.ManagedIdentityCredential.
credential, err := azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{
ID: id,
})
if err != nil {
return nil, err
}
client, err = azblob.NewClient(u.String(), credential, &opts)
if err != nil {
return nil, err
}
// If no authentication mechanism has been explicitly specified, assume shared key credential.
default:
credential, err := azblob.NewSharedKeyCredential(accountName, getStorageAccountKey(cfg))
if err != nil {
return nil, err
}
client, err = azblob.NewClientWithSharedKeyCredential(u.String(), credential, &opts)
if err != nil {
return nil, err
}
}
return client.ServiceClient().NewContainerClient(cfg.ContainerName), nil
}
func getBlobClient(ctx context.Context, conf *Config, blobName string) (*blob.Client, error) {
c, err := getContainerClient(ctx, conf, false)
if err != nil {
return nil, err
}
return c.NewBlobClient(blobName), nil
}
func CreateContainer(ctx context.Context, conf *Config) (*container.Client, error) {
c, err := getContainerClient(ctx, conf, false)
if err != nil {
return nil, err
}
_, err = c.Create(ctx, &container.CreateOptions{})
return c, err
}
func getStorageAccountName(cfg *Config) string {
accountName := cfg.StorageAccountName
if accountName == "" {
accountName = os.Getenv("AZURE_STORAGE_ACCOUNT")
}
return accountName
}
func getStorageAccountKey(cfg *Config) string {
accountKey := cfg.StorageAccountKey.String()
if accountKey == "" {
accountKey = os.Getenv("AZURE_STORAGE_KEY")
}
return accountKey
}