下面将通过编码我们需要的货币和订单的业务对象开始。此处,货币业务对象存储所有不同的加密货币或法定货币。./models/order.py
from models.model import AbstractModel
class Currency(AbstractModel):
name: str = ''
symbol: str = ''
fiat: bool
def __init__(self, **kwargs):
super().__init__(**kwargs)
./models/order.py
from models.model import AbstractModel
class Order(AbstractModel):
BUY = 'BUY'
SELL = 'SELL'
TYPE_LIMIT = 'LIMIT'
TYPE_MARKET = 'MARKET'
TYPE_STOP_LOSS = 'STOP_LOSS'
TYPE_STOP_LOSS_LIMIT = 'STOP_LOSS_LIMIT'
TYPE_TAKE_PROFIT = 'TAKE_PROFIT'
TYPE_TAKE_PROFIT_LIMIT = 'TAKE_PROFIT_LIMIT'
TYPE_LIMIT_MAKER = 'LIMIT_MAKER'
uuid = ''
side: str = ''
type: str = TYPE_LIMIT
symbol: str = ''
currency: str = ''
asset: str = ''
price: float = 0
quantity: int = 0
test: bool = False
def __init__(self, **kwargs):
super().__init__(**kwargs)
然后构建交换抽象层,并开发我们的第一个连接器。由于这里不是编写API包装器的重点,因此为了方便起见,我们将使用非官方的Binance API包装器库python-binance(https://python-binance.readthedocs.io/en/latest/binance.html
)。./exchanges/exchange.py
import datetime
from api import utils
from abc import ABC, abstractmethod
from twisted.internet import reactor
from strategies.strategy import Strategy
from models.order import Order
class Exchange(ABC):
currency: str
asset: str
strategy: Strategy
def __init__(self, key: str, secret: str):
self.apiKey = key
self.apiSecret = secret
self.name = None
self.client = None
self.socketManager = None
self.socket = None
self.currency = ''
self.asset = ''
self.strategy = None
def set_currency(self, symbol: str):
self.currency = symbol
def set_asset(self, symbol: str):
self.asset = symbol
def set_strategy(self, strategy: Strategy):
self.strategy = strategy
def compute_symbol_pair(self):
return utils.format_pair(self.currency, self.asset)
# abstract methods
# Override to set current exchange symbol pair notation (default with _ separator currency_asset ex: eur_btc)
@abstractmethod
def get_symbol(self):
return self.compute_symbol_pair(self)
# Get current symbol ticker
@abstractmethod
def symbol_ticker(self):
pass
# Get current symbol ticker candle for given interval
@abstractmethod
def symbol_ticker_candle(self, interval):
pass
# Get current symbol historic value
@abstractmethod
def historical_symbol_ticker_candle(self, start: datetime, end=None, interval=60):
pass
# Get balance for a given currency
@abstractmethod
def get_asset_balance(self, currency):
pass
# Create an exchange order
@abstractmethod
def order(self, order: Order):
pass
# Create an exchange test order
@abstractmethod
def test_order(self, order: Order):
pass
# Check an exchange order status
@abstractmethod
def check_order(self, orderId):
pass
# Cancel an exchange order
@abstractmethod
def cancel_order(self, orderId):
pass
# WebSocket related methods
@abstractmethod
def get_socket_manager(self, purchase):
pass
@abstractmethod
def websocket_event_handler(self, msg):
pass
def start_socket(self):
print('Starting WebSocket connection...')
self.socketManager.start()
def close_socket(self):
self.socketManager.stop_socket(self.socket)
self.socketManager.close()
# properly terminate WebSocket
reactor.stop()
@abstractmethod
def start_symbol_ticker_socket(self, symbol: str):
pass
./exchanges/binance.py
from datetime import datetime
from math import floor
from binance.client import Client
from binance.enums import *
from binance.websockets import BinanceSocketManager
from api import utils
from exchanges import exchange
from models.order import Order
from models.price import Price
class Binance(exchange.Exchange):
def __init__(self, key: str, secret: str):
super().__init__(key, secret)
self.client = Client(self.apiKey, self.apiSecret)
self.name = self.__class__.__name__
def get_client(self):
return self.client
def get_symbol(self):
return self.currency + self.asset
def symbol_ticker(self):
response = self.client.get_symbol_ticker(symbol=self.get_symbol())
return Price(pair=self.get_symbol(), currency=self.currency.lower(), asset=self.asset.lower(), exchange=self.name.lower(),
current=response['price'])
def symbol_ticker_candle(self, interval=Client.KLINE_INTERVAL_1MINUTE):
return self.client.get_klines(symbol=self.get_symbol(), interval=interval)
def historical_symbol_ticker_candle(self, start: datetime, end=None, interval=Client.KLINE_INTERVAL_1MINUTE):
# Convert default seconds interval to string like "1m"
if isinstance(interval, int):
interval = str(floor(interval/60)) + 'm'
output = []
for candle in self.client.get_historical_klines_generator(self.get_symbol(), interval, start, end):
output.append(
Price(pair=self.compute_symbol_pair(), currency=self.currency.lower(), asset=self.asset.lower(), exchange=self.name.lower(),
current=candle[1], lowest=candle[3], highest=candle[2], volume=candle[5], openAt=utils.format_date(datetime.fromtimestamp(int(candle[0])/1000)))
)
return output
def get_asset_balance(self, currency):
response = self.client.get_asset_balance(currency)
return response['free']
def order(self, order: Order):
return self.client.create_order(
symbol=order.symbol,
side=order.side,
type=order.type,
timeInForce=TIME_IN_FORCE_GTC,
quantity=order.quantity,
price=order.price
)
def test_order(self, order: Order):
return self.client.create_test_order(
symbol=order.symbol,
side=order.side,
type=order.type,
timeInForce=TIME_IN_FORCE_GTC,
quantity=order.quantity,
price=order.price
)
def check_order(self, orderId):
return self.client.get_order(
symbol=self.get_symbol(),
orderId=orderId
)
def cancel_order(self, orderId):
return self.client.cancel_order(
symbol=self.get_symbol(),
orderId=orderId
)
def get_socket_manager(self):
return BinanceSocketManager(self.client)
def start_symbol_ticker_socket(self, symbol: str):
self.socketManager = self.get_socket_manager()
self.socket = self.socketManager.start_symbol_ticker_socket(
symbol=self.get_symbol(),
callback=self.websocket_event_handler
)
self.start_socket()
def websocket_event_handler(self, msg):
if msg['e'
] == 'error':
print(msg)
self.close_socket()
else:
self.strategy.set_price(
Price(pair=self.compute_symbol_pair(), currency=self.currency, asset=self.asset, exchange=self.name,
current=msg['b'], lowest=msg['l'], highest=msg['h'])
)
self.strategy.run()
现在,只需调用我们的策略启动方法,我们便拥有了一个极简而强大的交易机器人系统。使用自己的指标和策略,您可以通过传递订单来开始买卖。但是,在开始编写自己的策略代码之前,请确保对其进行测试并安全运行。所以你需要一个系统来做到这一点。在下一部分中,我们将实现回测模式,以针对交易所的历史数据来测试您的策略,并提供导入这些数据的服务。另外,我们将所有这些部分与全局配置和一些依赖项一起打包进命令行工具中。最后我们将通过查看如何对该代码进行容器化和工业化实现来进一步介绍。