在本文中,我们将介绍如何为 Interactive Brokers Native Python API 提供的 EClient
和 EWrapper
类派生子类。然后,我们将提供端到端连接测试脚本,以确保我们能够与 IB 进行对话。
盈透证券(Interactive Brokers)一直是受交易者欢迎的经纪商。最初,这可能部分归因于 IB 提供了一个应用程序编程接口 (API),允许量化交易者获取市场数据并直接在代码中进行交易。许多相互竞争的经纪商花了一些时间来开发自己的 API,使 IB 在零售量化交易领域获得了合理的先发优势。
虽然最初的 IB API 以接口复杂而著称,但近年来随着 IB Native Python API 库的发布,情况发生了变化。
在这个新系列文章中,我们将使用 ibapi
库,来解释如何通过“原生 Python”接口与Interactive Brokers API
交互。
最终我们将学习如何请求市场数据、定义合同和处理订单。本文将重点介绍接口本身并测试基本连接。
本文假设您有一个可运行的 Python 虚拟环境(例如 Anaconda 个人版)并已成功将 IB Python API 安装到该环境中。安装说明是特定于操作系统的。可以在Interactive Brokers API
站点上找到最新的说明。
概述
IB API 通过异步'request-response'
模型进行工作。消息通过客户端类发送到 IB 服务器(通过 Trader Workstation
或IB Gateway
),而响应(称为“errors”
)由包装类单独处理。
大部分内部连接处理都通过 Python API 从最终用户那里抽取出来,允许最少必要的'boilerplate'
代码进行连接。但是,连接到 IB 的原始遗留机制仍然部分地影响了 API 的设计。因此,对于那些不习惯面向对象设计原则的人来说,这可能会令人困惑。
虽然最初似乎不清楚所有组件如何组合在一起,但在对以下类进行编码后,应该开始了解 API 的构建方式。
为了连接到 IB API,需要对四个主要组件进行编码。
第一个是 IB API EWrapper
类的派生子类。 EWrapper
用于处理来自 IB 服务器的所有响应('errors'
)。
第二个是IB API EClient
类的派生子类。 EClient
用于将所有消息发送到 IB 服务器。
第三个是从 EWrapper
和 EClient
派生的子类的多重继承类,用作 IB API 应用程序的基础,它将所有通讯联系在一起。
最后会有一个 if __name__ == "__main__":
入口点,旨在允许从命令行执行脚本。
初始化
第一步是导入脚本中使用的必要库组件。
我们将需要 IB API EWrapper
和 EClient
类,这将在下面描述。我们还需要分别来自线程 和队列库的Thread
和Queue
标准库组件。最后,我们将导入 datetime
以将 Unix 时间戳转换为更易读的格式:
# ib_api_connection.py
import datetime
import queue
import threading
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
我们现在可以定义 IBAPIWrapper
类。
IBAPIWrapper
类
EWrapper
类提供了一个接口处理来自 IB 服务器的响应(描述为'errors'
)。接口指定可以在派生子类中实现的方法。通过继承这个类,我们可以覆盖这些方法以适应我们自己特定的数据处理方法。
让我们首先创建 EWrapper
的 IBAPIWrapper
派生子类并覆盖一些方法。显示此组件的完整代码段,并将依次介绍每种方法:
# ib_api_connection.py
class IBAPIWrapper(EWrapper):
"""
A derived subclass of the IB API EWrapper interface
that allows more straightforward response processing
from the IB Gateway or an instance of TWS.
"""
def init_error(self):
"""
Place all of the error messages from IB into a
Python queue, which can be accessed elsewhere.
"""
error_queue = queue.Queue()
self._errors = error_queue
def is_error(self):
"""
Check the error queue for the presence
of errors.
Returns
-------
`boolean`
Whether the error queue is not empty
"""
return not self._errors.empty()
def get_error(self, timeout=5):
"""
Attempts to retrieve an error from the error queue,
otherwise returns None.
Parameters
----------
timeout : `float`
Time-out after this many seconds.
Returns
-------
`str` or None
A potential error message from the error queue.
"""
if self.is_error():
try:
return self._errors.get(timeout=timeout)
except queue.Empty:
return None
return None
def error(self, id, errorCode, errorString):
"""
Format the error message with appropriate codes and
place the error string onto the error queue.
"""
error_message = (
"IB Error ID (%d), Error Code (%d) with "
"response '%s'" % (id, errorCode, errorString)
)
self._errors.put(error_message)
def init_time(self):
"""
Instantiates a new queue to store the server
time, assigning it to a 'private' instance
variable and also returning it.
Returns
-------
`Queue`
The time queue instance.
"""
time_queue = queue.Queue()
self._time_queue = time_queue
return time_queue
def currentTime(self, server_time):
"""
Takes the time received by the server and
appends it to the class instance time queue.
Parameters
----------
server_time : `str`
The server time message.
"""
self._time_queue.put(server_time)
init_error
的任务是创建一个 Python Queue
队列并将其附加一个名为_errors
的“私有”实例变量。该队列将在整个类中用于存储 IB 错误消息以供以后处理。
is_error
是一个简单的方法,用于判断_errors
队列是否为空。
get_error
尝试从队列中检索错误消息,规定的超时时间以秒为单位。如果队列为空或超时,则该方法不会返回任何内容。
error
将提供的错误代码与错误消息一起格式化为适当的字符串格式,然后将其放入 _errors
队列。此方法用于在针对 API 执行代码时在控制台上提供更好的调试信息。
这四种方法完成了对来自盈透证券的响应('errors'
)的处理。需要注意的是,ibapi
库中有很多内部机器执行此处理。从我们的派生子类中无法直接看到大部分工作。
其余两个方法 init_time
和 currentTime
用于执行连接“健全性检查”('sanity check'
)。确定我们是否连接成功的一种简单方法是检索 IB 服务器上的本地时间。
这两种方法只是创建一个新队列来存储服务器时间消息,并在请求时将新时间消息放置到这个队列中。
我们的 EWrapper
简单子类到此结束。我们现在能够处理来自 IB 服务器的某些响应。下一个任务是实际向 IB 服务器发送消息。为此,我们需要覆盖 EClient
类。
IBAPIClient
类
EClient
的 IBAPIClient
派生子类用于向 IB 服务器发送消息。
需要特别注意的是,我们派生子类的构造函数 __init__
方法接受一个包装参数,然后将其传递给EClient
父构造函数。这意味着在 IBAPIClient
类中没有覆盖本地 IB API 方法。相反,我们提供了包装器实例(从 IBAPIWrapper
实例化)来处理响应。
# ib_api_connection.py
class IBAPIClient(EClient):
"""
Used to send messages to the IB servers via the API. In this
simple derived subclass of EClient we provide a method called
obtain_server_time to carry out a 'sanity check' for connection
testing.
Parameters
----------
wrapper : `EWrapper` derived subclass
Used to handle the responses sent from IB servers
"""
MAX_WAIT_TIME_SECONDS = 10
def __init__(self, wrapper):
EClient.__init__(self, wrapper)
def obtain_server_time(self):
"""
Requests the current server time from IB then
returns it if available.
Returns
-------
`int`
The server unix timestamp.
"""
# Instantiate a queue to store the server time
time_queue = self.wrapper.init_time()
# Ask IB for the server time using the EClient method
self.reqCurrentTime()
# Try to obtain the latest server time if it exists
# in the queue, otherwise issue a warning
try:
server_time = time_queue.get(
timeout=IBAPIClient.MAX_WAIT_TIME_SECONDS
)
except queue.Empty:
print(
"Time queue was empty or exceeded maximum timeout of "
"%d seconds" % IBAPIClient.MAX_WAIT_TIME_SECONDS
)
server_time = None
# Output all additional errors, if they exist
while self.wrapper.is_error():
print(self.get_error())
return server_time
在obtain_server_time
中,我们首先创建一个队列来保存来自服务器的时间戳消息。然后我们调用原生 EClient
方法 reqCurrentTime
从服务器获取时间。
随后,我们在 try...except
块中包装了一个从时间队列中获取值的调用。我们提供10秒的超时时间。如果超时或队列为空,我们将服务器时间设置为None
。
我们运行一个 while 循环来检查 EWrapper
派生子类中定义的错误队列中的任何其他响应。如果它们存在,它们将打印到控制台。
最后我们返回服务器时间。
下一阶段是创建一种机制来实例化 IBAPIWrapper
和 IBAPIClient
,以及实际连接到 IB 服务器。
IBAPIApp
本文中要派生的最后一个类是 IBAPIApp 类。
此类利用多重继承从 IBAPIWrapper
和 IBAPIClient
类继承。在初始化时,这两个类也被初始化。但是,请注意 IBAPIClient
类将 wrapper=self
作为初始化关键字参数,因为 IBAPIApp
也是从 IBAPIWrapper
派生的。
在初始化两个父类之后,使用适当的连接参数调用connect
原生方法。
下一部分代码初始化应用程序所需的各种线程。客户端实例有一个线程,另一个用于将响应消息添加到各个队列。
最后调用 init_error
方法开始监听 IB 响应。
# ib_api_connection.py
class IBAPIApp(IBAPIWrapper, IBAPIClient):
"""
The IB API application class creates the instances
of IBAPIWrapper and IBAPIClient, through a multiple
inheritance mechanism.
When the class is initialised it connects to the IB
server. At this stage multiple threads of execution
are generated for the client and wrapper.
Parameters
----------
ipaddress : `str`
The IP address of the TWS client/IB Gateway
portid : `int`
The port to connect to TWS/IB Gateway with
clientid : `int`
An (arbitrary) client ID, that must be a positive integer
"""
def __init__(self, ipaddress, portid, clientid):
IBAPIWrapper.__init__(self)
IBAPIClient.__init__(self, wrapper=self)
# Connects to the IB server with the
# appropriate connection parameters
self.connect(ipaddress, portid, clientid)
# Initialise the threads for various components
thread = threading.Thread(target=self.run)
thread.start()
setattr(self, "_thread", thread)
# Listen for the IB responses
self.init_error()
现在定义了前三个类,我们就可以创建脚本入口点了。
执行代码
我们首先设置连接参数,包括主机 IP 地址、连接到 TWS/IB
网关的端口和(任意)正整数客户端 ID。
然后我们使用适当的连接参数实例化一个应用程序实例。
我们使用该应用程序从 IB 获取服务器时间,然后使用 datetime
库的 utcfromtimestamp
方法将 Unix 时间戳适当地格式化为更具可读性的日期格式。
最后我们断开与IB服务器的连接并结束程序。
# ib_api_connection.py
if __name__ == '__main__':
# Application parameters
host = '127.0.0.1' # Localhost, but change if TWS is running elsewhere
port = 7497
# Change to the appropriate IB TWS account port number
client_id = 1234
print("Launching IB API application...")
# Instantiate the IB API application
app = IBAPIApp(host, port, client_id)
print("Successfully launched IB API application...")
# Obtain the server time via the IB API app
server_time = app.obtain_server_time()
server_time_readable = datetime.datetime.utcfromtimestamp(
server_time
).strftime('%Y-%m-%d %H:%M:%S')
print("Current IB server time: %s" % server_time_readable)
# Disconnect from the IB server
app.disconnect()
print("Disconnected from the IB API application. Finished.")
在这个阶段,我们准备运行 ib_api_connection.py
。只需导航到您存储文件的目录,确保带有 ibapi
的虚拟环境处于活动状态,并且 TWS(或 IB 网关)已正确加载和配置,然后键入以下内容:
python ib_api_connection.py
您应该会看到类似于以下内容的输出:
Launching IB API application...
Successfully launched IB API application...
IB Error ID (-1), Error Code (2104) with response 'Market data farm connection is OK:usfarm'
IB Error ID (-1), Error Code (2106) with response 'HMDS data farm connection is OK:ushmds'
IB Error ID (-1), Error Code (2158) with response 'Sec-def data farm connection is OK:secdefnj'
Current IB server time: 2020-07-29 13:27:18
Disconnected from the IB API application. Finished.
unhandled exception in EReader thread
Traceback (most recent call last):
File "/home/mhallsmoore/venv/qstrader/lib/python3.6/site-packages/ibapi/reader.py", line 34, in run
data = self.conn.recvMsg()
File "/home/mhallsmoore/venv/qstrader/lib/python3.6/site-packages/ibapi/connection.py", line 99, in recvMsg
buf = self._recvAllMsg()
File "/home/mhallsmoore/venv/qstrader/lib/python3.6/site-packages/ibapi/connection.py", line 119, in _recvAllMsg
buf = self.socket.recv(4096)
OSError: [Errno 9] Bad file descriptor
第一组输出是带有代码 2104、2106 和 2158 的 IB 'errors'
。这些实际上是说明与各种服务器的连接正常运行的响应。也就是说,它们不是'errors'
!
服务器时间也从 Unix 时间戳正确转换为更易读的格式和输出。在此阶段,应用程序断开连接。
但是请注意,在 EReader
线程中引发了 OSError
异常。这是 IB API 本身的一个内部问题,目前还没有一个修复程序。出于本教程的目的,它可以被忽略。
现在完成了连接到 IB Python Native API 的教程。 ib_api_connection.py
的完整代码请扫描下方二维码获取。
我们已经成功连接到IB服务器,并通过调用检查连接,获取当前服务器时间。后面我们将确定如何从 IB API 检索股票的市场数据。
↓↓长按扫码获取完整源码↓↓