社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

用 Python 和币安 API 构建数字货币交易机器人(三)

Python中文社区 • 3 年前 • 497 次点击  

欢迎来到本系列的第三部分。第一部分在这里,第二部分在这里

数据集创建

首先,让我们介绍一个新的“数据集”业务对象来对价格进行分组。

./models/dataset.py

from datetime import datetime

from api import utils
from models.model import AbstractModel
from models.exchange import Exchange
from models.currency import Currency


class Dataset(AbstractModel):
    resource_name = 'datasets'

    pair: str = ''
    exchange: str = ''
    period_start: str = ''
    period_end: str = ''
    currency: str = ''
    asset: str = ''

    relations = {'exchange': Exchange, 'currency': Currency, 'asset': Currency}

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.pair = self.get_pair()

    def get_pair(self):
        return utils.format_pair(self.currency, self.asset)

引入服务对象

我们需要构建一个服务来解析和加载Binance交易所或任何其他具有API和此类历史行情自动收录器端点的历史数据。

./services/importer.py

import sys
from datetime import datetime
from models.dataset import Dataset


class Importer:
    def __init__(self, exchange, period_start: datetime, period_end=None, interval=60, *args, **kwargs):
        self.exchange = exchange
        self.interval = interval
        self.period_start = period_start
        self.period_end = period_end
        self.start = datetime.now()
        self.dataset = Dataset().create(
            data={'exchange''/api/exchanges/'+self.exchange.name.lower(), 'periodStart': self.period_start, 'periodEnd': self.period_end,
                  'candleSize'60,
                  'currency''/api/currencies/'+self.exchange.currency.lower(), 'asset''/api/currencies/'+self.exchange.asset.lower()})

    def process(self):
        for price in self.exchange.historical_symbol_ticker_candle(self.period_start, self.period_end, self.interval):
            print(price.create({'dataset''/api/datasets/'+self.dataset.uuid}))

        execution_time = datetime.now() - self.start
        print('Execution time: ' + str(execution_time.total_seconds()) + ' seconds')
        sys.exit()

这项服务职责非常简单明了,他的名字说明了一切,我们从交易所导入和存储历史行情自动收录器数据。

在这里,您可以将对象直接存储在诸如PostgreSQL之类的关系数据库中,也可以构建内部REST API并将其用作数据库的代理,以实现高性能。

回测

回测是编写您未来的机器人并根据历史行情自动收录器数据针对所有市场情况进行测试的最重要工具。

为此,我们将创建一个回测服务,他的职责是从您当前的本地数据中加载数据集,如果找不到,则直接从交易所加载(默认情况下为Binance)。然后针对历史数据集中的每个价格数据运行给定策略。

/services/backtest.py

import sys
from datetime import datetime

from exchanges.exchange import Exchange
from models.dataset import Dataset
from models.price import Price


class Backtest:
    def __init__(self, exchange: Exchange, period_start: datetime, period_end=None, interval=60):
        self.launchedAt = datetime.now()
        # Try to find dataset
        dataset = Dataset().query('get', {"exchange"'/api/exchanges/' + exchange.name.lower(),
                                          "currency"'/api/currencies/' + exchange.currency.lower(),
                                          "asset"'/api/currencies/' + exchange.asset.lower(),
                                          "period_start": period_start, "period_end": period_end, "candleSize": interval})

        if dataset and len(dataset) > 0:
            print(dataset[0])
            price = Price()
            for price in price.query('get', {"dataset": dataset[0]['uuid']}):
                newPrice = Price()
                newPrice.populate(price)
                exchange.strategy.set_price(newPrice)
                exchange.strategy.run()
        else:
            print("Dataset not found, external API call to " + exchange.name)
            for price in exchange.historical_symbol_ticker_candle(period_start, period_end, interval):
                exchange.strategy.set_price(price)
                exchange.strategy.run()

        execution_time = datetime.now() - self.launchedAt
        print('Execution time: ' + str(execution_time.total_seconds()) + ' seconds')
        sys.exit()

项目配置

我们将使用dotenv库来管理环境变量。这是项目的默认值:

./.env.local

AVAILABLE_EXCHANGES="coinbase,binance"
EXCHANGE="binance"

BINANCE_API_KEY="Your Binance API KEY"
BINANCE_API_SECRET="Your Binance API SECRET"

COINBASE_API_KEY="Your Coinbase API KEY""
COINBASE_API_SECRET="
Your Coinbase API SECRET""

# Available modes
# "trade" to trade on candlesticks
# "live" to live trade throught WebSocket
# "backtest" to test a strategy for a given symbol pair and a period
# "import" to import dataset from exchanges for a given symbol pair and a period
MODE="trade"
STRATEGY="logger"
# Allow trading "test" mode or "real" trading
TRADING_MODE="test"
# Default candle size in seconds
CANDLE_INTERVAL=60
CURRENCY="BTC"
ASSET="EUR"
# Default period for backtesting: string in UTC format
PERIOD_START="2021-02-28T08:49"
PERIOD_END="2021-03-09T08:49"

DATABASE_URL="postgresql://postgres:password@127.0.0.1:15432/cryptobot"

主线程

然后将所有这些部分放到一个主线程上,主要是使用args以及环境变量的CLI命令。

这样,我们可以覆盖任何默认环境设置,并直接使用基于命令行的客户端直接调整所有输入参数。

例如,在使用诸如Docker之类的容器化工具时,它也确实非常有用,只需启动此主线程,它将与特定容器的环境变量一起运行。

我们将根据设置动态加载和导入我们创建的每个组件。

./main.py

#!/usr/bin/python3

import importlib
import signal
import sys
import threading
from decouple import config

from services.backtest import Backtest
from services.importer import Importer

exchange_name = config('EXCHANGE')
available_exchanges = config('AVAILABLE_EXCHANGES').split(',')
mode: str = config('MODE')
strategy: str = config('STRATEGY')
trading_mode: str = config('TRADING_MODE')
interval: int = int(config('CANDLE_INTERVAL'))
currency: str = config('CURRENCY')
asset: str = config('ASSET')

if trading_mode == 'real':
    print("*** Caution: Real trading mode activated ***")
else:
    print("Test mode")

# Parse symbol pair from first command argument
if len(sys.argv) > 1:
    currencies = sys.argv[1].split('_')
    if len(currencies) > 1:
        currency = currencies[0]
        asset = currencies[1]

# Load exchange
print("Connecting to {} exchange...".format(exchange_name[0].upper() + exchange_name[1:]))
exchangeModule = importlib.import_module('exchanges.' + exchange_name, package=None)
exchangeClass = getattr(exchangeModule, exchange_name[0].upper() + exchange_name[1:])
exchange = exchangeClass(config(exchange_name.upper() + '_API_KEY'), config(exchange_name.upper() + '_API_SECRET'))

# Load currencies
exchange.set_currency(currency)
exchange.set_asset(asset)

# Load strategy
strategyModule = importlib.import_module('strategies.' + strategy, package=None)
strategyClass = getattr(strategyModule, strategy[0].upper() + strategy[1:])
exchange.set_strategy(strategyClass(exchange, interval))

# mode
print("{} mode on {} symbol".format(mode, exchange.get_symbol()))
if mode == 'trade':
    exchange.strategy.start()

elif mode == 'live':
    exchange.start_symbol_ticker_socket(exchange.get_symbol())

elif mode == 'backtest':
    period_start = config('PERIOD_START')
    period_end = config('PERIOD_END')

    print(
        "Backtest period from {} to {} with {} seconds candlesticks.".format(
            period_start,
            period_end,
            interval
        )
    )
    Backtest(exchange, period_start, period_end, interval)

elif mode == 'import':
    period_start = config('PERIOD_START')
    period_end = config('PERIOD_END')

    print(
        "Import mode on {} symbol for period from {} to {} with {} seconds candlesticks.".format(
            exchange.get_symbol(),
            period_start,
            period_end,
            interval
        )
    )
    importer = Importer(exchange, period_start, period_end, interval)
    importer.process()

else:
    print('Not supported mode.')


def signal_handler(signal, frame):
    if (exchange.socket):
        print('Closing WebSocket connection...')
        exchange.close_socket()
        sys.exit(0)
    else:
        print('stopping strategy...')
        exchange.strategy.stop()
        sys.exit(0)


# Listen for keyboard interrupt event
signal.signal(signal.SIGINT, signal_handler)
forever = threading.Event()
forever.wait()
exchange.strategy.stop()
sys.exit(0)

用法

# Real time trading mode via WebSocket
MODE=live ./main.py BTC_EUR

# Trading mode with default 1 minute candle
MODE=trade ./main.py BTC_EUR

# Import data from Exchange
MODE=import ./main.py BTC_EUR

# Backtest with an imported dataset or Binance Exchange API
MODE=backtest ./main.py BTC_EUR

您可以轻松地覆盖任何设置,如下所示:

PERIOD_START="2021-04-16 00:00" PERIOD_END="2021-04-16 00:00" STRATEGY=myCustomStrategy MODE=backtest ./main.py BTC_EUR

要退出测试模式并进行真实交易,只需将“ trading_mode”从“ test”切换为“ real”。谨慎使用,后果自负。

TRADING_MODE=real ./main.py BTC_EUR

集成化项目

我们可以使用Docker对该程序进行容器化。这是一个简单的自我解释Docker构建文件的例子。

FROM python:3.9

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD [ "python""./main.py" ]

基准

使用带有16go DDR3 ram的旧AMD Phenom II 955四核CPU,并运行其他进程。

导入

导入价格并将其部署到内部API

1天的行情

Execution time: 82.716666 seconds

一周的行情

Execution time: 9,423079183 minutes

一个月的行情

Execution time: 27,48139456 minutes

六个月的行情

Execution time: 3.032364739 hours

回测

从导入的数据集

1天的行情

Execution time: 3.746787 seconds

一周的行情

Execution time: 46.900068 seconds

一个月的行情

Execution time: 1.8953 seconds

六个月的行情

Execution time: 12,15175435 minutes

结论

我们构建了一个kickass性能的实时加密交易机器人。他能够使用少量的CPU和RAM在大型市场数据集上对您的策略进行回测。从交易所导入数据集,甚至使用WebSocket实时执行实时交易。

更进一步

  • 编写一个涵盖所有程序行为的测试套件,以确保将来不会退化。

  • 构建并使用内部Rest API实时持久保存所有加密货币交易所市场数据。

  • 建立最终用户客户端,例如移动应用或网络应用。使用WebSocket或服务器发送事件,以显示实时指标。

感谢您阅读这三部分的文章,内容涉及如何使用python 3和Binance API构建加密机器人。



欢迎添加下方二维码进群交流
Python数字货币量化交易技术

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/114755
 
497 次点击