干货 | 数据管理:用python从零实现基于事件驱动的量化回测(Backtest)系统(三)

标签:
ai股票金融量化投资 |
分类: python从零实现AI量化投资系统 |
前面两篇文章,分别讲述了基于事件驱动(Event-driven)的量化回测系统的层次结构,以及事件类型。本文重点讲述市场数据是如何在回测系统以及实盘中使用的。
我们很重要的目标是要实现最大化在回测和实盘间复用代码,避免开发两套系统,也有效避免回测过程与实盘不一样。这样DataHandler提供数据,Strategy产生信号以及Porfolio类处理订单都应该使用一致的数据接口。
DataHandler是一个数据管理基类,所以子类比如基于CSV文件提供数据是CSVDataHandler或者MongodbDataHandler等,替换这个数据处理器,并不会影响Strategy或Porfolio,这样就相对容易实现功能扩展。
下面我们开始实现DataHandler。
首先我们要导入相应的头文件,包括datetime,pandas等,基类有两个抽象接口,是子类必须实现的。
-
update_bars:在循环中每调用一次,时间会往前走一个周期,如果是日线则就是一天,相当于新的一天收市。
-
get_latest_bars:是在策略计算中,可以取最近的N个bars。用于各种指标计算,比如MA(5)等
# data.py
import datetime
import os, os.path
import pandas as pd
import copy
from abc import ABCMeta, abstractmethod
from aiquant.engine.event import BarEvent
'''
DataHandler是一个抽象数据处理类,所以实际数据处理类都继承于此(包含历史回测、实盘)
'''
class DataHandler(object):
__metaclass__ = ABCMeta
@abstractmethod
def get_latest_bars(self, symbol, N=1):
"""
返回最近的N个Bar,如果当前Bar的数量不足N,则有多少就返回多少
""" raise NotImplementedError("没有实现 get_latest_bars()")
@abstractmethod
def update_bars(self):
"""
把symbol列表里所有symbol最近的Bar导入
""" raise NotImplementedError("没有实现 update_bars()")
为简单起见,本文先实现一个数据保存的csv里的数据管理器。
class CSVDataHandler(DataHandler):
def __init__(self,events,csv_dir,symbol_list):
self.b_continue_backtest = True
self.events = events
#symbol_list:传入要处理的symbol列表集合,list类型
self.symbol_list = symbol_list
self.symbol_list_with_benchmark = copy.deepcopy(self.symbol_list)
self.symbol_list_with_benchmark.append('000300')
self.csv_dir = csv_dir
self.symbol_data = {} #symbol_data,{symbol:DataFrame}
self.latest_symbol_data = {}#最新的bar:{symbol:[bar1,bar2,barNew]}
self.b_continue_backtest = True
self._open_convert_csv_files()
def _open_convert_csv_files(self):
comb_index = None
s in self.symbol_list_with_benchmark: for
# 加载csv文件,date,OHLC,Volume
self.symbol_data[s] = pd.read_csv(
os.path.join(self.csv_dir, '%s.csv' % s),
header=0, index_col=0,parse_dates=False,
names=['date', 'open', 'high', 'low', 'close', 'volume']
).sort_index()
# Combine the index to pad forward values
if comb_index is None:
comb_index = self.symbol_data[s].index
else:
#这里要赋值,否则comb_index还是原来的index
comb_index = comb_index.union(self.symbol_data[s].index)
# 设置latest symbol_data 为 None
self.latest_symbol_data[s] = []
# Reindex the dataframes
for s in self.symbol_list_with_benchmark:
#这是一个发生器iterrows[index,series],用next(self.symbol_data[s])
方式,就是用前一天的数据再填充这一天的丢失,对于资本市场这是合理的,比如这段时间停牌。那就是按停牌前一天的价格数据来计算。 #pad
self.symbol_data[s] = self.symbol_data[s].reindex(index=comb_index, method='pad').iterrows()
def _get_new_bar(self, symbol):
"""
row = (index,series),row[0]=index,row[1]=[OHLCV] """ row = next(self.symbol_data[symbol])
#return tuple(symbol,row[0],row[1][0],row[1][1],row[1][2],row[1][3],row[1][4]])
row_dict = {'symbol':symbol,'date':row[0],'open':row[1][0],'high':row[1][1],'low':row[1][2],'close':row[1][3]}
return row_dict
def update_bars(self):
"""
Pushes the latest bar to the latest_symbol_data structure for all symbols in the symbol list. """ for s in self.symbol_list_with_benchmark:
try:
bar = self._get_new_bar(s)
print(bar)
except StopIteration:
self.b_continue_backtest = False
: else
if bar is not None:
self.latest_symbol_data[s].append(bar)
self.events.put(BarEvent())
def get_latest_bars(self, symbol, N=1):
"""
Returns the last N bars from the latest_symbol list, or N-k if less available. """ try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
else:
return bars_list[-N:]
后续还需要扩展到真实环境,比如直接从线上的mongodb里访问数据。
关于作者:魏佳斌,互联网产品/技术总监,北京大学光华管理学院(MBA),特许金融分析师(CFA)。深度关注互联网发展趋势,AI金融量化。致力于使用最新的人工智能技术去理解经济、金融,实现信息增值。
扫描下方二维码,关注:AI量化实验室(ailabx),了解AI量化最前沿技术、资讯。
http://s3/mw690/001rxTlvzy7eCeDre8ya2&690|
喜欢
0
赠金笔