对投资来说,核心就两件事情:选股、择时。这两点有了后,才涉及到资金管理和仓位管理。
其中选股可以说是前提,对每个人来说也是最核心和最重要的。网上肯定有各种资料和专业的金工研报,方法也五花八门,对我们工程角度思考,还是先构建一个完全能理解的选股的"demo"框架,然后不断的优化每个过程,并通过回测验证或实盘对比,分析自己构建的选股逻辑的准确性。
题外话,我最近也在听北京炒家的一些投资方面的音频,一个重要的体会是: 不管以什么方式(主观投资、价值投资、量化交易、打板等等)做交易,关键的是贵在坚持,并不断的优化过程。特别是现在信息特别多,很容易迷失,今天追北京炒家、明天追量化、后天研究通达信公式,这样肯定不容易找到自己的投资体系。(即使我们学习了,感觉悟到了,最后能不能就一定赚到呢?这肯定不一定,但如果无脑投资,只赚运气的钱,那肯定不是正确的路径。)
一.思路
结合一篇金工报告,先看下一个简单的选股逻辑的基本思路,其中涉及到趋势、量价、资金三方面,当然可能还有其他维度。
基于上面的整体思路,可以通过具体的“指标和条件”来落地和实现:
接着就构建具体操作思路:
二.代码
对选股来说,不比之前文章中介绍的一些简单的指标,基本上都依赖OPHLV等日线,或者分钟Kline数据,目前可获取的途径还是有的。但做选股的话,其中数据层面面临着如下几个困难:
数据范围更广 :可能需要一些基本面数据,甚至另类数据(舆情,企业基本信息等等),机构投资者可以购买wind、聚源、通联、天软、米筐、聚宽等数据资讯商的专业数据;而作为我们个人,直接购买的话可能也是一笔不少的费用,通过akshare,tushare等平台获取,数据质量和稳定性上可能又得不到保证。
数据量更大 :如果我们要基于全市场选股,那需要把5000只股票的数据都获取到,不管是对我们自己设备的存储,程序的性能,还是程序的设计上都有要求,当然如果自己能确定到一个具体的指数范围内,比如中证500,中证1000指数成分股范围选,那可以更一步缩小范围。
数据周期不同 :有的选股指标可能要涉及到更长的周期,有的可能需要日量价数据,有的可能需要实时监控(比如舆情),所以如何对这些不同频数据设计统一的管控也是问题。
今天的代码还是先不考虑这些,先跑通一个流程为原则:
1.导入包
import os import sys import logging import akshare as ak import pandas as pd import numpy as np from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed import requests
2.配置相关
设置系统默认编码(需在文件开头)
sys .stdout.reconfigure(encoding='utf-8 ') # Python 3 .7 +
sys .stderr.reconfigure(encoding='utf-8 ')
配置区(用户可修改参数)
class Config :
CACHE_DIR = "./cache/"
LOG_LEVEL = logging.DEBUG
MAX_WORKERS = 8
CACHE_EXPIRE_DAYS = 3
}
初始化AKShare Session
session = requests.Session()
ak.session = session
日志配置:
def setup_logger():
if not os.path.exists (Config.CACHE_DIR):
os.makedirs (Config.CACHE_DIR)
log_file = os.path.join (
Config.CACHE_DIR,
f"select_stock_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
)
logger = logging.getLogger ('StockSelector' )
logger.setLevel (Config.LOG_LEVEL)
# 文件处理器(UTF-8 )
file_handler = logging.FileHandler (
filename=log_file,
encoding='utf-8' ,
mode='w'
)
file_handler.setFormatter (logging.Formatter (
'%(asctime)s - %(levelname)s - %(message)s'
))
# 控制台处理器(UTF-8 )
console_handler = logging.StreamHandler ()
console_handler.setFormatter (logging.Formatter (
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler (file_handler)
logger.addHandler (console_handler)
return logger
logger = setup_logger ()
3.数据缓存
避免每次运行程序都请求Akshare服务端,所以把数据查询后保存在本地,可有于研究。
class DataCache :
@staticmethod
def get_cache_path (data_type, symbol= "" ):
paths = {
"stock_list" : "stock_list.csv" ,
"hist_data" : f"hist/ {symbol} _hist.csv" ,
"float_shares" : f"shares/ {symbol} _shares.csv"
}
return os.path.join(Config.CACHE_DIR, paths[data_type])
@staticmethod
def is_cache_valid (file_path ):
if not os.path.exists(file_path):
return False
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
return (datetime.now() - file_time).days < Config.CACHE_EXPIRE_DAYS
@staticmethod
def save_stock_list ():
try :
logger.info("正在获取股票列表..." )
df = ak.stock_zh_a_spot_em()
cache_path = DataCache.get_cache_path("stock_list" )
df.to_csv(cache_path, index=False , encoding='utf-8-sig' )
logger.info(f"股票列表已缓存至 {cache_path} " )
return df
except Exception as e:
logger.error(f"获取股票列表失败: { str (e)} " )
raise
@staticmethod
def load_stock_list ():
cache_path = DataCache.get_cache_path("stock_list" )
if os.path.exists(cache_path) and DataCache.is_cache_valid(cache_path):
logger.info(f"从缓存加载股票列表: {cache_path} " )
return pd.read_csv(cache_path, encoding='utf-8-sig' )
return DataCache.save_stock_list()
核心选股逻辑(优化条件检查)
class StockSelector :
def __init__ (self ):
self.symbol_list = DataCache.load_stock_list()['代码' ].tolist()
self.end_date = datetime.now().strftime('%Y%m%d' )
self.start_date = (datetime.now() - timedelta(days=20 )).strftime('%Y%m%d' )
def get_hist_data (self, symbol ):
cache_path = DataCache.get_cache_path("hist_data" , symbol)
if os.path.exists(cache_path) and DataCache.is_cache_valid(cache_path):
logger.debug(f"从缓存加载历史数据: {symbol} " )
return pd.read_csv(cache_path, parse_dates=['日期' ], encoding='utf-8-sig' )
logger.info(f"下载历史数据: {symbol} " )
hist_df = ak.stock_zh_a_hist(
symbol=symbol, period="daily" ,
start_date=self.start_date, end_date=self.end_date,
adjust="qfq"
)
hist_df.to_csv(cache_path, index=False , encoding='utf-8-sig' )
return hist_df
def get_float_shares (self, symbol ):
cache_path = DataCache.get_cache_path("float_shares" , symbol)
if os.path.exists(cache_path) and DataCache.is_cache_valid(cache_path):
logger.debug(f"从缓存加载流通股本: {symbol} " )
return pd.read_csv(cache_path, encoding='utf-8-sig' )['流通股本' ].iloc[-1 ]
logger.info(f"下载流通股本: {symbol} " )
shares_df = ak.stock_zh_a_circulate(symbol)
shares_df.to_csv(cache_path, index=False , encoding='utf-8-sig' )
return shares_df['流通股本' ].iloc[-1 ]
def analyze_conditions (self, symbol ):
try :
logger.info(f"开始处理股票: {symbol} " )
hist_df = self.get_hist_data(symbol)
if len (hist_df) 20 :
logger.warning(f"数据不足: {symbol} (仅 { len (hist_df)} 日数据)" )
return None
if hist_df.isnull().values.any ():
logger.warning(f"数据包含空值: {symbol} " )
return None
latest = hist_df.iloc[-1 ]
prev_day = hist_df.iloc[-2 ]
conditions = {
'code' : symbol,
'name' : DataCache.load_stock_list().query('代码 == @symbol' )['名称' ].values[0 ],
'conditions' : {},
'details' : {}
}
pct_change = (latest['收盘' ] - prev_day['收盘' ]) / prev_day['收盘' ] * 100
conditions['conditions' ]['涨幅' ] = 2 <= pct_change <= 5
conditions['details' ]['涨幅' ] = round (pct_change, 2 )
vol_mean_5d = hist_df['成交量' ].iloc[-6 :-1 ].mean()
current_vol_ratio = latest['成交量' ] / vol_mean_5d if vol_mean_5d > 0 else 0
conditions['conditions' ]['量比' ] = current_vol_ratio > 1
conditions['details' ]['量比' ] = round (current_vol_ratio, 2 )
try :
float_shares = self.get_float_shares(symbol)
turnover_rate = latest['成交量' ] * 100 / float_shares * 100
conditions['conditions' ]['换手率' ] = 5 <= turnover_rate <= 10
conditions['details' ]['换手率' ] = round (turnover_rate, 2 )
except Exception as e:
logger.error(f"换手率计算失败: {symbol} - { str (e)} " )
conditions['conditions' ]['换手率' ] = False
amount = latest['成交量' ] * 100 * latest['收盘' ]
conditions['conditions' ]['成交额' ] = 50e8 <= amount <= 200e8
conditions['details' ]['成交额(亿)' ] = round (amount / 1e8 , 2 )
conditions['conditions' ]['量价齐升' ] = (latest['成交量' ] > prev_day['成交量' ]) & (latest['收盘' ] > prev_day['收盘' ])
conditions['details' ]['成交量变化' ] = f" {prev_day[ '成交量' ]} → {latest[ '成交量' ]} "
conditions['details' ]['价格变化' ] = f" {prev_day[ '收盘' ]} → {latest[ '收盘' ]} "
hist_df['MA5' ] = hist_df['收盘' ].rolling(5 ).mean()
hist_df['MA10' ] = hist_df['收盘' ].rolling(10 ).mean()
hist_df['MA20' ] = hist_df['收盘' ].rolling(20 ).mean()
ma_condition = (latest['MA5' ] > latest['MA10' ]) & \
(latest['MA10' ] > latest['MA20' ]) & \
(hist_df['MA5' ].iloc[-1 ] > hist_df['MA5' ].iloc[-2 ])
conditions['conditions' ]['均线多头' ] = ma_condition
conditions['details' ]['均线' ] = f"MA5= {latest[ 'MA5' ]: .2 f} , MA10= {latest[ 'MA10' ]: .2 f} , MA20= {latest[ 'MA20' ]: .2 f} "
conditions['conditions' ]['分时均价' ] = latest['收盘' ] > latest['开盘' ]
conditions['details' ]['分时价差' ] = f"开盘 {latest[ '开盘' ]} →收盘 {latest[ '收盘' ]} "
try :
fund_flow = ak.stock_individual_fund_flow(symbol, market="sh" ).iloc[:3 ]
fund_condition = (fund_flow['大单净流入' ].sum () > 0 ) & (fund_flow['大单净流入' ].iloc[0 ] > 0 )
conditions['conditions' ]['资金流入' ] = fund_condition
conditions['details' ]['资金流入' ] = fund_flow['大单净流入' ].tolist()
except Exception as e:
logger.error(f"资金流获取失败: {symbol} - { str (e)} " )
conditions['conditions' ]['资金流入' ] = False
logger.debug(f"
{symbol} 详细检查结果: {conditions} " )
return conditions
except Exception as e:
logger.error(f"处理股票 {symbol} 时出错: { str (e)} " , exc_info=True )
return None
def run (self ):
selected_stocks = []
with ThreadPoolExecutor(max_workers=Config.MAX_WORKERS) as executor:
futures = {executor.submit(self.analyze_conditions, symbol): symbol for symbol in self.symbol_list[:20 ]}
for future in as_completed(futures):
result = future.result()
if result and all (result['conditions' ].values()):
selected_stocks.append({
'代码' : result['code' ],
'名称' : result['name' ],
**result['details' ]
})
if selected_stocks:
df = pd.DataFrame(selected_stocks)
print ("筛选到符合条件的股票:" )
print (df.to_string(index=False ))
else :
logger.warning("今日无符合条件股票,分析建议:" )
logger.warning("1. 检查各条件阈值是否合理(如成交额范围、换手率要求)" )
logger.warning("2. 查看个股详细日志(设置LOG_LEVEL=DEBUG)" )
logger.warning("3. 验证数据源是否正常(查看cache目录中的csv文件)" )
logger.warning("4. 检查股票流通股本计算是否正确(成交量单位为手)" )
logger.warning("5. 确认资金流接口是否有效(可能需更新AKShare库)" )
4.主程序
运行程序代码
if __name__ == "__main__" :
try :
logger.info("====== 开始选股 ======" )
selector = StockSelector()
selector.run()
logger.info("====== 程序执行完成 ======" )
except Exception as e:
logger.error(f"程序异常终止: { str (e)} " , exc_info=True )
三.运行
运行代码,需要先注意如下几点
1.包版本
Python 3 .8 +
AKShare >= 1 .2 .0
Pandas >= 1 .3 .0
2.数据目录创建
为了避免每次调试程序都需要访问Akshare的api接口读取数据,所以增加了缓存机制,若希望本地缓存更新,删除cache目录下的文件即可强制更新数据,修改start_date和end_date选择历史数据的范围,首次运行代码时,在代码的同级目录创建如下文件夹:
3.日志配置
修改配置中的LOG_LEVEL = logging.DEBUG可以查看每个股票的条件检查细节。
4.运行结果
按照日志信息,需要逐步排查数据及每个条件,看为什么没有选择到标的。