Skip to content
This repository was archived by the owner on Feb 1, 2024. It is now read-only.

Commit 3e8093d

Browse files
authored
Add support for web sockets in binance, outline for binanceExchange_ws.go + GetTickerPrice() (#717) (part of #715)
* feat/add ttlMap + binanceExchangeWs + binanceExchangeWs.GetTickerPrice * patch/ remove ttlmap + log in ms + add now variable * bugfix/ deps * patch/resolve comments * patch/ revert version go-jwt-middleware * patch/ refactor Set/Get/Del * patch/ transform waitForFirstEvent to timeToWaitForFirstEvent + fix glide.lock * patch/ update glide.yaml * patch/ fix version for go-binance glide.yaml * patch/ forgot defer
1 parent 08378e1 commit 3e8093d

4 files changed

Lines changed: 371 additions & 7 deletions

File tree

glide.lock

Lines changed: 17 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

glide.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,8 @@ import:
6666
- package: github.com/denisbrodbeck/machineid
6767
version: v1.0.1
6868
- package: github.com/google/uuid
69-
version: v1.1.2
69+
version: v1.1.2
70+
- package: github.com/adshao/go-binance
71+
version: v2.3.0
72+
subpackages:
73+
- v2

plugins/binanceExchange_ws.go

Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
package plugins
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"strconv"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"github.com/adshao/go-binance/v2"
12+
"github.com/stellar/kelp/api"
13+
"github.com/stellar/kelp/model"
14+
)
15+
16+
const (
17+
STREAM_TICKER_FMT = "%s@ticker"
18+
TTLTIME = time.Second * 3 // ttl time in seconds
19+
)
20+
21+
var (
22+
timeToWaitForFirstEvent = time.Second
23+
)
24+
25+
var (
26+
ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"}
27+
)
28+
29+
type errMissingSymbol struct {
30+
symbol string
31+
}
32+
33+
func (err errMissingSymbol) Error() string {
34+
return fmt.Sprintf("Symbol %s is missing from exchange intizialization", err.symbol)
35+
}
36+
37+
type errConversion struct {
38+
from string
39+
to string
40+
}
41+
42+
func (err errConversion) Error() string {
43+
return fmt.Sprintf("Error conversion from %s to %s", err.from, err.to)
44+
}
45+
46+
type stream struct {
47+
doneC chan struct{}
48+
stopC chan struct{}
49+
cleanup func()
50+
}
51+
52+
//Wait until the stream ends
53+
func (s stream) Wait() {
54+
55+
if s.doneC == nil {
56+
return
57+
}
58+
59+
<-s.doneC
60+
}
61+
62+
//Close the stream and cleanup any data
63+
func (s stream) Close() {
64+
if s.stopC == nil {
65+
return
66+
}
67+
s.stopC <- struct{}{}
68+
s.stopC = nil
69+
70+
if s.cleanup != nil {
71+
s.cleanup()
72+
}
73+
}
74+
75+
//mapData... struct used to data from events and timestamp when they are cached
76+
type mapData struct {
77+
data interface{}
78+
createdAt time.Time
79+
}
80+
81+
//isStatle... check if data it's stale
82+
func isStale(data mapData, ttl time.Duration) bool {
83+
84+
return time.Now().Sub(data.createdAt).Seconds() > ttl.Seconds()
85+
}
86+
87+
//struct used to cache events
88+
type mapEvents struct {
89+
data map[string]mapData
90+
mtx *sync.RWMutex
91+
}
92+
93+
//Set ... set value
94+
func (m *mapEvents) Set(key string, data interface{}) {
95+
96+
now := time.Now()
97+
98+
m.mtx.Lock()
99+
defer m.mtx.Unlock()
100+
101+
m.data[key] = mapData{
102+
data: data,
103+
createdAt: now,
104+
}
105+
106+
}
107+
108+
//Get ... get value
109+
func (m *mapEvents) Get(key string) (mapData, bool) {
110+
m.mtx.RLock()
111+
defer m.mtx.RUnlock()
112+
113+
data, isData := m.data[key]
114+
115+
return data, isData
116+
}
117+
118+
//Del ... delete cached value
119+
func (m *mapEvents) Del(key string) {
120+
m.mtx.Lock()
121+
defer m.mtx.Unlock()
122+
123+
delete(m.data, key)
124+
125+
}
126+
127+
// create new map for cache
128+
func makeMapEvents() *mapEvents {
129+
return &mapEvents{
130+
data: make(map[string]mapData),
131+
mtx: &sync.RWMutex{},
132+
}
133+
}
134+
135+
//struct used to keep all cached data
136+
type events struct {
137+
SymbolStats *mapEvents
138+
}
139+
140+
func createStateEvents() *events {
141+
events := &events{
142+
SymbolStats: makeMapEvents(),
143+
}
144+
145+
return events
146+
}
147+
148+
// subscribe for symbol@ticker
149+
func subcribeTicker(symbol string, state *mapEvents) (*stream, error) {
150+
151+
wsMarketStatHandler := func(ticker *binance.WsMarketStatEvent) {
152+
state.Set(symbol, ticker)
153+
}
154+
155+
errHandler := func(err error) {
156+
log.Printf("Error WsMarketsStat for symbol %s: %v\n", symbol, err)
157+
}
158+
159+
doneC, stopC, err := binance.WsMarketStatServe(symbol, wsMarketStatHandler, errHandler)
160+
161+
if err != nil {
162+
return nil, err
163+
}
164+
165+
return &stream{doneC: doneC, stopC: stopC, cleanup: func() {
166+
state.Del(symbol)
167+
}}, err
168+
169+
}
170+
171+
type binanceExchangeWs struct {
172+
events *events
173+
174+
streams map[string]*stream
175+
streamLock *sync.Mutex
176+
177+
assetConverter model.AssetConverterInterface
178+
delimiter string
179+
}
180+
181+
// makeBinanceWs is a factory method to make an binance exchange over ws
182+
func makeBinanceWs() (*binanceExchangeWs, error) {
183+
184+
binance.WebsocketKeepalive = true
185+
186+
events := createStateEvents()
187+
188+
beWs := &binanceExchangeWs{
189+
events: events,
190+
delimiter: "",
191+
assetConverter: model.CcxtAssetConverter,
192+
streamLock: &sync.Mutex{},
193+
streams: make(map[string]*stream),
194+
}
195+
196+
return beWs, nil
197+
}
198+
199+
//getPrceision... get precision for float string
200+
func getPrecision(floatStr string) int8 {
201+
202+
strs := strings.Split(floatStr, ".")
203+
204+
if len(strs) != 2 {
205+
log.Printf("could not get precision for float %s\n", floatStr)
206+
return 0
207+
}
208+
209+
return int8(len(strs[1]))
210+
}
211+
212+
// GetTickerPrice impl.
213+
func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[model.TradingPair]api.Ticker, error) {
214+
215+
priceResult := map[model.TradingPair]api.Ticker{}
216+
for _, p := range pairs {
217+
218+
symbol, err := p.ToString(beWs.assetConverter, beWs.delimiter)
219+
220+
if err != nil {
221+
return nil, err
222+
}
223+
224+
tickerData, isTicker := beWs.events.SymbolStats.Get(symbol)
225+
226+
if !isTicker {
227+
stream, err := subcribeTicker(symbol, beWs.events.SymbolStats)
228+
229+
if err != nil {
230+
return nil, fmt.Errorf("error when subscribing for %s: %s", symbol, err)
231+
}
232+
233+
//Store stream
234+
beWs.streamLock.Lock()
235+
beWs.streams[fmt.Sprintf(STREAM_TICKER_FMT, symbol)] = stream
236+
beWs.streamLock.Unlock()
237+
238+
//Wait for binance to send events
239+
time.Sleep(timeToWaitForFirstEvent)
240+
241+
tickerData, isTicker = beWs.events.SymbolStats.Get(symbol)
242+
243+
//We couldn't subscribe for this pair
244+
if !isTicker {
245+
return nil, fmt.Errorf("error while fetching ticker price for trading pair %s", symbol)
246+
}
247+
248+
}
249+
250+
//Show how old is the ticker
251+
log.Printf("Ticker for %s is %d milliseconds old!\n", symbol, time.Now().Sub(tickerData.createdAt).Milliseconds())
252+
253+
if isStale(tickerData, TTLTIME) {
254+
return nil, fmt.Errorf("ticker for %s symbols is older than %v", symbol, TTLTIME)
255+
}
256+
257+
tickerI := tickerData.data
258+
259+
//Convert to WsMarketStatEvent
260+
ticker, isOk := tickerI.(*binance.WsMarketStatEvent)
261+
262+
if !isOk {
263+
return nil, ErrConversionWsMarketEvent
264+
}
265+
266+
askPrice, e := strconv.ParseFloat(ticker.AskPrice, 64)
267+
if e != nil {
268+
return nil, fmt.Errorf("unable to correctly parse 'ask': %s", e)
269+
}
270+
bidPrice, e := strconv.ParseFloat(ticker.BidPrice, 64)
271+
if e != nil {
272+
return nil, fmt.Errorf("unable to correctly parse 'bid': %s", e)
273+
}
274+
lastPrice, e := strconv.ParseFloat(ticker.LastPrice, 64)
275+
if e != nil {
276+
return nil, fmt.Errorf("unable to correctly parse 'last': %s", e)
277+
}
278+
279+
priceResult[p] = api.Ticker{
280+
AskPrice: model.NumberFromFloat(askPrice, getPrecision(ticker.AskPrice)),
281+
BidPrice: model.NumberFromFloat(bidPrice, getPrecision(ticker.BidPrice)),
282+
LastPrice: model.NumberFromFloat(lastPrice, getPrecision(ticker.LastPrice)),
283+
}
284+
}
285+
286+
return priceResult, nil
287+
}
288+
289+
//Unsubscribe ... unsubscribe from binance streams
290+
func (beWs *binanceExchangeWs) Unsubscribe(stream string) {
291+
292+
beWs.streamLock.Lock()
293+
294+
if stream, isStream := beWs.streams[stream]; isStream {
295+
stream.Close()
296+
}
297+
298+
beWs.streamLock.Unlock()
299+
}

0 commit comments

Comments
 (0)