量化缠论——维克多交易策略
关于量化缠论的研究,可以看一下《缠论系统如何量化并自动交易?》
以及《【量化缠论】之分型、笔、线段识别》
《量化缠论应用之维克多1-2-3法则》
如果涉及复杂的运算,推荐使用提取Dict形式的数据回测,速度会有质的飞跃。
收益风险
源码:
import pandas as pd
from datetime import timedelta, date
def initialize(context):
g.security =
['600307.XSHG']
set_universe(g.security)
set_benchmark('600307.XSHG')
set_commission(PerTrade(buy_cost=0.0003, sell_cost=0.0013,
min_cost=5))
set_slippage(PriceRelatedSlippage())
g.n = 5 #获取几分钟k线
## 获取前几日的趋势
''' temp_data
包含处理后最后一个k线的数据
zoushi 包含关系处理中关于走势的记录
after_baohan 合并后的k线'''
g.temp_data, g.zoushi,
g.after_baohan = k_initialization(security=g.security[0],num =
10*48,n=g.n)
##
每日运行一次"根据跌幅止损"、"根据大盘跌幅止损"
run_daily(dapan_stoploss) #根据大盘止损,如不想加入大盘止损,注释此句即可
# run_daily(sell) #
根据跌幅卖出
# run_daily(sell2) #
按天亏损率止损 & 固定止盈
def handle_data(context, data):
security =
g.security
Cash =
context.portfolio.cash
hour =
context.current_dt.hour
minute =
context.current_dt.minute
n = g.n
if (hour==9 and minute
==30) or (hour==13 and minute ==00):
pass
else:
if minute%n==0:
print
'time: %s:%s' %(hour,minute)
#
获得前n分钟的k线
x =
'%sm'%n
temp_hist
= attribute_history(security[0], 1, str(x),['high', 'low'],
df=False)
#
包含关系处理
Input_k =
{'high':[],'low':[]}
Input_k['high'].append(g.temp_data['high'])
Input_k['low'].append(g.temp_data['low'])
Input_k['high'].append(temp_hist['high'][0])
Input_k['low'].append(temp_hist['low'][0])
g.temp_data, g.zoushi, g.after_baohan = recognition_baohan(Input_k,
g.zoushi, g.after_baohan)
# 分型
fenxing_type, fenxing_time, fenxing_plot, fenxing_data =
recognition_fenxing(g.after_baohan)
'''
fenxing_type 记录分型点的类型,1为顶分型,-1为底分型
fenxing_time 记录分型点的时间
fenxing_plot 记录点的数值,为顶分型去high值,为底分型去low值
fenxing_data 分型点的DataFrame值
'''
# print
fenxing_type, fenxing_time, fenxing_plot, fenxing_data
#
判断趋势是否反转,并买进
if
len(fenxing_type)>7:
if fenxing_type[0] ==
-1:
location_1 = [i for i,a in
enumerate(fenxing_type) if a==1] # 找出1在列表中的所有位置
location_2 = [i for i,a in
enumerate(fenxing_type) if a==-1] # 找出-1在列表中的所有位置
# 线段破坏
case1 = fenxing_data['low'][location_2[0]] >
fenxing_data['low'][location_2[1]]
# 线段形成
case2 = fenxing_data['high'][location_1[1]] <
fenxing_data['high'][location_1[2]] <
fenxing_data['high'][location_1[3]]
case3 = fenxing_data['low'][location_2[1]] <
fenxing_data['low'][location_2[2]] <
fenxing_data['low'][location_2[3]]
#
第i笔中底比第i+2笔顶高(扶助判断条件,根据实测加入之后条件太苛刻,很难有买入机会)
case4 = fenxing_data['low'][location_2[1]] >
fenxing_data['high'][location_1[3]]
if case1 and case2 and case3 :
# 买入
order_value(security[0],Cash)
# 每分钟亏损率止损
& 固定止盈
if
len(context.portfolio.positions) > 0:
for stock in
list(context.portfolio.positions.keys()):
price = data[stock].pre_close
avg_cost =
context.portfolio.positions[stock].avg_cost
# if (price-avg_cost)/avg_cost >= 0.2 :
#
order_target(stock, 0)
if (price-avg_cost)/avg_cost <= -0.1 :
order_target(stock, 0)
elif (price-avg_cost)/avg_cost >= 0.18
:
order_target(stock, 0)
'''下面为各类函数'''
################################################################
def sell2(context):
## 亏损率止损 &
固定止盈
if
len(context.portfolio.positions) > 0:
for stock in
list(context.portfolio.positions.keys()):
hist =
attribute_history(stock, 1, '1d', 'close',df=False)
price =
hist['close'][0]
avg_cost =
context.portfolio.positions[stock].avg_cost
if
(price-avg_cost)/avg_cost <= -0.1 :
order_target(stock, 0)
elif
(price-avg_cost)/avg_cost >= 0.2 :
order_target(stock, 0)
def dapan_stoploss(context):
##
根据局大盘止损,具体用法详见dp_stoploss函数说明
stoploss =
dp_stoploss(kernel=2, n=10, zs=0.05)
if stoploss:
if len(context.portfolio.positions)>0:
for stock
in list(context.portfolio.positions.keys()):
order_target(stock, 0)
# return
def sell(context):
# 根据跌幅卖出
if
len(context.portfolio.positions)>0:
for stock in
list(context.portfolio.positions.keys()):
hist =
attribute_history(stock, 3, '1d', 'close',df=False)
if
((1-float(hist['close'][-1]/hist['close'][0])) >= 0.15):
order_target(stock,0)
def recognition_fenxing(after_baohan):
'''
从后往前找
返回值:
fenxing_type
记录分型点的类型,1为顶分型,-1为底分型
fenxing_time
记录分型点的时间
fenxing_plot
记录点的数值,为顶分型去high值,为底分型去low值
fenxing_data
分型点的DataFrame值
'''
## 找出顶和底
temp_num = 0
#上一个顶或底的位置
temp_high = 0
#上一个顶的high值
temp_low = 0
#上一个底的low值
temp_type = 0 #
上一个记录位置的类型
end =
len(after_baohan['high'])
i = end-2
fenxing_type = [] #
记录分型点的类型,1为顶分型,-1为底分型
fenxing_time = [] #
记录分型点的时间
fenxing_plot = [] #
记录点的数值,为顶分型去high值,为底分型去low值
fenxing_data =
{'high':[],'low':[]} # 分型点的DataFrame值
while (i >= 1):
if len(fenxing_type)>8:
break
else:
case1 =
after_baohan['high'][i-1]after_baohan['high'][i+1] #顶分型
case2 =
after_baohan['low'][i-1]>after_baohan['low'][i] and
after_baohan['low'][i]
if
case1:
if temp_type == 1: #
如果上一个分型为顶分型,则进行比较,选取高点更高的分型
if after_baohan['high'][i] <=
temp_high:
i -=
1
else:
temp_high
= after_baohan['high'][i]
temp_num =
i
temp_type
= 1
i -=
4
elif temp_type == 2: #
如果上一个分型为底分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型
if temp_low >= after_baohan['high'][i]: #
如果上一个底分型的底比当前顶分型的顶高,则跳过当前顶分型。
i -=
1
else:
fenxing_type.append(-1)
#
fenxing_time.append(after_baohan.index[temp_num].strftime("%Y-%m-%d
%H:%M:%S"))
fenxing_data['high'].append(after_baohan['high'][temp_num])
fenxing_data['low'].append(after_baohan['low'][temp_num])
fenxing_plot.append(after_baohan['high'][i])
temp_high
= after_baohan['high'][i]
temp_num =
i
temp_type
= 1
i -=
4
else:
if
(after_baohan['low'][i-2]>after_baohan['low'][i-1] and
after_baohan['low'][i-1]
temp_low =
after_baohan['low'][i]
temp_num =
i-1
temp_type
= 2
i -=
4
else:
temp_high
= after_baohan['high'][i]
temp_num =
i
temp_type
= 1
i -=
4
elif
case2:
if temp_type == 2: #
如果上一个分型为底分型,则进行比较,选取低点更低的分型
if after_baohan['low'][i] >= temp_low:
i -=
1
else:
temp_low =
after_baohan['low'][i]
temp_num =
i
temp_type
= 2
i -=
4
elif temp_type == 1: #
如果上一个分型为顶分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型
if temp_high <= after_baohan['low'][i]: #
如果上一个顶分型的底比当前底分型的底低,则跳过当前底分型。
i -=
1
else:
fenxing_type.append(1)
#
fenxing_time.append(after_baohan.index[temp_num].strftime("%Y-%m-%d
%H:%M:%S"))
fenxing_data['high'].append(after_baohan['high'][temp_num])
fenxing_data['low'].append(after_baohan['low'][temp_num])
fenxing_plot.append(after_baohan['low'][i])
temp_low =
after_baohan['low'][i]
temp_num =
i
temp_type
= 2
i -=
4
else:
if
(after_baohan['high'][i-2]after_baohan['high'][i]):
temp_high
= after_baohan['high'][i]
temp_num =
i-1
temp_type
= 1
i -=
4
else:
temp_low =
after_baohan['low'][i]
temp_num =
i
temp_type
= 2
i -=
4
else:
i -= 1
return fenxing_type,
fenxing_time, fenxing_plot, fenxing_data
def recognition_baohan(Input_k, zoushi, after_baohan):
'''
判断两根k线的包含关系
temp_data
包含处理后最后一个k线的数据
zoushi
包含关系处理中关于走势的记录
Input_k
是temp_data与新n分钟k线的合集
zoushi: 3-持平 4-向下
5-向上
after_baohan
处理之后的k线
'''
import pandas as
pd
temp_data = {}
temp_data['high'] =
Input_k['high'][0]
temp_data['low'] =
Input_k['low'][0]
case1_1 =
temp_data['high'] > Input_k['high'][1] and temp_data['low'] <
Input_k['low'][1]# 第1根包含第2根
case1_2 =
temp_data['high'] > Input_k['high'][1] and temp_data['low'] ==
Input_k['low'][1]# 第1根包含第2根
case1_3 =
temp_data['high'] == Input_k['high'][1] and temp_data['low'] <
Input_k['low'][1]# 第1根包含第2根
case2_1 =
temp_data['high'] < Input_k['high'][1] and temp_data['low'] >
Input_k['low'][1] # 第2根包含第1根
case2_2 =
temp_data['high'] < Input_k['high'][1] and temp_data['low'] ==
Input_k['low'][1] # 第2根包含第1根
case2_3 =
temp_data['high'] == Input_k['high'][1] and temp_data['low'] >
Input_k['low'][1] # 第2根包含第1根
case3 =
temp_data['high'] == Input_k['high'][1] and temp_data['low'] ==
Input_k['low'][1] # 第1根等于第2根
case4 =
temp_data['high'] > Input_k['high'][1] and temp_data['low'] >
Input_k['low'][1] # 向下趋势
case5 =
temp_data['high'] < Input_k['high'][1] and temp_data['low'] <
Input_k['low'][1] # 向上趋势
if case1_1 or case1_2 or
case1_3:
if zoushi[-1] == 4:
temp_data['high'] = Input_k['high'][1]
else:
temp_data['low'] = Input_k['low'][1]
elif case2_1 or case2_2
or case2_3:
temp_temp = {}
temp_temp['high'] = temp_data['high']
temp_temp['low'] = temp_data['low']
temp_data['high'] = Input_k['high'][1]
temp_data['low'] = Input_k['low'][1]
if zoushi[-1] == 4:
temp_data['high'] = temp_temp['high']
else:
temp_data['low'] = temp_temp['low']
elif case3:
zoushi.append(3)
pass
elif case4:
zoushi.append(4)
after_baohan['high'].append(temp_data['high'])
after_baohan['low'].append(temp_data['low'])
temp_data['high'] = Input_k['high'][1]
temp_data['low'] = Input_k['low'][1]
elif case5:
zoushi.append(5)
after_baohan['high'].append(temp_data['high'])
after_baohan['low'].append(temp_data['low'])
temp_data['high'] = Input_k['high'][1]
temp_data['low'] = Input_k['low'][1]
return temp_data,
zoushi, after_baohan
def k_initialization(security,num = 10*48,n=5):
'''
读入回测日期之前的多日k线用以判断之前的趋势
返回值:
temp_data 包含处理后最后一个k线的数据
zoushi 包含关系处理中关于走势的记录
after_baohan 合并后的k线
'''
import pandas as
pd
x = '%sm'%n
temp_data = {}
# zoushi = {}
after_baohan = {}
t = {}
stock=security
k_data =
attribute_history(stock, num, str(x),['high', 'low'],
df=False)
## 判断包含关系
after_baohan =
{'high':[],'low':[]}
t['high'] =
k_data['high'][0]
t['low'] =
k_data['low'][0]
temp_data = t
zoushi = [3] # 3-持平 4-向下
5-向上
for i in
xrange(num):
case1_1 = temp_data['high'] >
k_data['high'][i] and temp_data['low'] < k_data['low'][i]#
第1根包含第2根
case1_2 = temp_data['high'] >
k_data['high'][i] and temp_data['low'] == k_data['low'][i]#
第1根包含第2根
case1_3 = temp_data['high'] == k_data['high'][i]
and temp_data['low'] < k_data['low'][i]# 第1根包含第2根
case2_1 = temp_data['high'] <
k_data['high'][i] and temp_data['low'] > k_data['low'][i] #
第2根包含第1根
case2_2 = temp_data['high'] <
k_data['high'][i] and temp_data['low'] == k_data['low'][i] #
第2根包含第1根
case2_3 = temp_data['high'] == k_data['high'][i]
and temp_data['low'] > k_data['low'][i] # 第2根包含第1根
case3 = temp_data['high'] == k_data['high'][i]
and temp_data['low'] == k_data['low'][i] # 第1根等于第2根
case4 = temp_data['high'] > k_data['high'][i]
and temp_data['low'] > k_data['low'][i] # 向下趋势
case5 = temp_data['high'] < k_data['high'][i]
and temp_data['low'] < k_data['low'][i] # 向上趋势
if case1_1 or case1_2 or case1_3:
if
zoushi[-1] == 4:
temp_data['high'] =
k_data['high'][i]
else:
temp_data['low'] =
k_data['low'][i]
elif case2_1 or case2_2 or case2_3:
temp_temp
= {}
temp_temp['high'] = temp_data['high']
temp_temp['low'] = temp_data['low']
temp_data['high'] = k_data['high'][i]
temp_data['low'] = k_data['low'][i]
if
zoushi[-1] == 4:
temp_data['high'] =
temp_temp['high']
else:
temp_data['low'] =
temp_temp['low']
elif case3:
zoushi.append(3)
pass
elif case4:
zoushi.append(4)
after_baohan['high'].append(temp_data['high'])
after_baohan['low'].append(temp_data['low'])
temp_data['high'] = k_data['high'][i]
temp_data['low'] = k_data['low'][i]
elif case5:
zoushi.append(5)
after_baohan['high'].append(temp_data['high'])
after_baohan['low'].append(temp_data['low'])
temp_data['high'] = k_data['high'][i]
temp_data['low'] = k_data['low'][i]
return temp_data,
zoushi, after_baohan
def dp_stoploss(kernel=2, n=10, zs=0.03):
'''
方法1:当大盘N日均线(默认60日)与昨日收盘价构成“死叉”,则发出True信号
方法2:当大盘N日内跌幅超过zs,则发出True信号
'''
#
止损方法1:根据大盘指数N日均线进行止损
if kernel == 1:
t = n+2
hist = attribute_history('000300.XSHG', t, '1d',
'close', df=False)
temp1 = sum(hist['close'][1:-1])/float(n)
temp2 = sum(hist['close'][0:-2])/float(n)
close1 = hist['close'][-1]
close2 = hist['close'][-2]
if (close2 > temp2) and (close1 <
temp1):
return
True
else:
return
False
#
止损方法2:根据大盘指数跌幅进行止损
elif kernel == 2:
hist1 = attribute_history('000300.XSHG', n,
'1d', 'close',df=False)
if
((1-float(hist1['close'][-1]/hist1['close'][0])) >= zs):
return
True
else:
return
False
加载中,请稍候......