社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

使用Python实现量化选股的基本思路

FinTechHi • 2 周前 • 24 次点击  

对投资来说,核心就两件事情:选股、择时。这两点有了后,才涉及到资金管理和仓位管理。

其中选股可以说是前提,对每个人来说也是最核心和最重要的。网上肯定有各种资料和专业的金工研报,方法也五花八门,对我们工程角度思考,还是先构建一个完全能理解的选股的"demo"框架,然后不断的优化每个过程,并通过回测验证或实盘对比,分析自己构建的选股逻辑的准确性。

题外话,我最近也在听北京炒家的一些投资方面的音频,一个重要的体会是:不管以什么方式(主观投资、价值投资、量化交易、打板等等)做交易,关键的是贵在坚持,并不断的优化过程。特别是现在信息特别多,很容易迷失,今天追北京炒家、明天追量化、后天研究通达信公式,这样肯定不容易找到自己的投资体系。(即使我们学习了,感觉悟到了,最后能不能就一定赚到呢?这肯定不一定,但如果无脑投资,只赚运气的钱,那肯定不是正确的路径。)

一.思路

结合一篇金工报告,先看下一个简单的选股逻辑的基本思路,其中涉及到趋势、量价、资金三方面,当然可能还有其他维度。

基于上面的整体思路,可以通过具体的“指标和条件”来落地和实现:

接着就构建具体操作思路:

二.代码

对选股来说,不比之前文章中介绍的一些简单的指标,基本上都依赖OPHLV等日线,或者分钟Kline数据,目前可获取的途径还是有的。但做选股的话,其中数据层面面临着如下几个困难: 

数据范围更广:可能需要一些基本面数据,甚至另类数据(舆情,企业基本信息等等),机构投资者可以购买wind、聚源、通联、天软、米筐、聚宽等数据资讯商的专业数据;而作为我们个人,直接购买的话可能也是一笔不少的费用,通过akshare,tushare等平台获取,数据质量和稳定性上可能又得不到保证。

数据量更大:如果我们要基于全市场选股,那需要把5000只股票的数据都获取到,不管是对我们自己设备的存储,程序的性能,还是程序的设计上都有要求,当然如果自己能确定到一个具体的指数范围内,比如中证500,中证1000指数成分股范围选,那可以更一步缩小范围。

数据周期不同:有的选股指标可能要涉及到更长的周期,有的可能需要日量价数据,有的可能需要实时监控(比如舆情),所以如何对这些不同频数据设计统一的管控也是问题。

今天的代码还是先不考虑这些,先跑通一个流程为原则:

1.导入包

import os
import sys
import logging
import akshare as ak
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

2.配置相关

设置系统默认编码(需在文件开头) 

sys.stdout.reconfigure(encoding='utf-8')  # Python 3.7sys.stderr.reconfigure(encoding='utf-8')

配置区(用户可修改参数)

class Config:    CACHE_DIR = "./cache/"  # 缓存目录    LOG_LEVEL = logging.DEBUG  # 日志级别    MAX_WORKERS = 8  # 并发线程数    CACHE_EXPIRE_DAYS = 3  # 缓存有效期(天)    }

初始化AKShare Session

session = requests.Session()ak.session = session  # 注入自定义session到AKShares

日志配置:

def setup_logger():    if not os.path.exists(Config.CACHE_DIR):        os.makedirs(Config.CACHE_DIR)        log_file = os.path.join(        Config.CACHE_DIR,        f"select_stock_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"    )        logger = logging.getLogger('StockSelector')    logger.setLevel(Config.LOG_LEVEL)        # 文件处理器(UTF-8    file_handler = logging.FileHandler(        filename=log_file,        encoding='utf-8',        mode='w'    )    file_handler.setFormatter(logging.Formatter(        '%(asctime)s - %(levelname)s - %(message)s'    ))        # 控制台处理器(UTF-8    console_handler = logging.StreamHandler()    console_handler.setFormatter(logging.Formatter(        '%(asctime)s - %(levelname)s - %(message)s'    ))        logger.addHandler(file_handler)    logger.addHandler(console_handler)        return loggerlogger = setup_logger()

3.数据缓存

避免每次运行程序都请求Akshare服务端,所以把数据查询后保存在本地,可有于研究。

class DataCache:    @staticmethod    def get_cache_path(data_type, symbol=""):        paths = {            "stock_list""stock_list.csv",            "hist_data"f"hist/{symbol}_hist.csv",            "float_shares"f"shares/{symbol}_shares.csv"        }        return os.path.join(Config.CACHE_DIR, paths[data_type])    @staticmethod    def is_cache_valid(file_path):        if not os.path.exists(file_path):            return False        file_time = datetime.fromtimestamp(os.path.getmtime(file_path))        return (datetime.now() - file_time).days < Config.CACHE_EXPIRE_DAYS    @staticmethod    def save_stock_list():        try:            logger.info("正在获取股票列表...")            df = ak.stock_zh_a_spot_em()  # 移除错误参数            cache_path = DataCache.get_cache_path("stock_list")            df.to_csv(cache_path, index=False, encoding='utf-8-sig')  # 添加BOM头            logger.info(f"股票列表已缓存至 {cache_path}")            return df        except Exception as e:            logger.error(f"获取股票列表失败: {str(e)}")            raise    @staticmethod    def load_stock_list():        cache_path = DataCache.get_cache_path("stock_list")        if os.path.exists(cache_path) and DataCache.is_cache_valid(cache_path):            logger.info(f"从缓存加载股票列表: {cache_path}")            return pd.read_csv(cache_path, encoding='utf-8-sig')        return DataCache.save_stock_list()

核心选股逻辑(优化条件检查)

class StockSelector:    def __init__(self):        self.symbol_list = DataCache.load_stock_list()['代码'].tolist()        self.end_date = datetime.now().strftime('%Y%m%d')        self.start_date = (datetime.now() - timedelta(days=20)).strftime('%Y%m%d')    def get_hist_data(self, symbol):        cache_path = DataCache.get_cache_path("hist_data", symbol)        if os.path.exists(cache_path) and DataCache.is_cache_valid(cache_path):            logger.debug(f"从缓存加载历史数据: {symbol}")            return pd.read_csv(cache_path, parse_dates=['日期'], encoding='utf-8-sig')                logger.info(f"下载历史数据: {symbol}")        hist_df = ak.stock_zh_a_hist(            symbol=symbol, period="daily",            start_date=self.start_date, end_date=self.end_date,            adjust="qfq"


    
        )        hist_df.to_csv(cache_path, index=False, encoding='utf-8-sig')        return hist_df    def get_float_shares(self, symbol):        cache_path = DataCache.get_cache_path("float_shares", symbol)        if os.path.exists(cache_path) and DataCache.is_cache_valid(cache_path):            logger.debug(f"从缓存加载流通股本: {symbol}")            return pd.read_csv(cache_path, encoding='utf-8-sig')['流通股本'].iloc[-1]                logger.info(f"下载流通股本: {symbol}")        shares_df = ak.stock_zh_a_circulate(symbol)        shares_df.to_csv(cache_path, index=False, encoding='utf-8-sig')        return shares_df['流通股本'].iloc[-1]    def analyze_conditions(self, symbol):        try:            logger.info(f"开始处理股票: {symbol}")            hist_df = self.get_hist_data(symbol)                        # 数据校验            if len(hist_df) 20:                logger.warning(f"数据不足: {symbol}(仅{len(hist_df)}日数据)")                return None            if hist_df.isnull().values.any():                logger.warning(f"数据包含空值: {symbol}")                return None            latest = hist_df.iloc[-1]            prev_day = hist_df.iloc[-2]            # 条件检查结果存储            conditions = {                'code': symbol,                'name': DataCache.load_stock_list().query('代码 == @symbol')['名称'].values[0],                'conditions': {},                'details': {}  # 添加详细数据记录            }            # 条件1: 涨幅2%-5%            pct_change = (latest['收盘'] - prev_day['收盘']) / prev_day['收盘'] * 100            conditions['conditions']['涨幅'] = 2 <= pct_change <= 5            conditions['details']['涨幅'] = round(pct_change, 2)            # 条件2: 量比>1            vol_mean_5d = hist_df['成交量'].iloc[-6:-1].mean()            current_vol_ratio = latest['成交量'] / vol_mean_5d if vol_mean_5d > 0 else 0            conditions['conditions']['量比'] = current_vol_ratio > 1            conditions['details']['量比'] = round(current_vol_ratio, 2)            # 条件3: 换手率5%-10%            try:                float_shares = self.get_float_shares(symbol)                turnover_rate = latest['成交量'] * 100 / float_shares * 100                conditions['conditions']['换手率'] = 5 <= turnover_rate <= 10                conditions['details']['换手率'] = round(turnover_rate, 2)            except Exception as e:                logger.error(f"换手率计算失败: {symbol} - {str(e)}")                conditions['conditions']['换手率'] = False            # 条件4: 成交额50-200亿            amount = latest['成交量'] * 100 * latest['收盘']


    
            conditions['conditions']['成交额'] = 50e8 <= amount <= 200e8            conditions['details']['成交额(亿)'] = round(amount / 1e82)            # 条件5: 量价齐升            conditions['conditions']['量价齐升'] = (latest['成交量'] > prev_day['成交量']) & (latest['收盘'] > prev_day['收盘'])            conditions['details']['成交量变化'] = f"{prev_day['成交量']}{latest['成交量']}"            conditions['details']['价格变化'] = f"{prev_day['收盘']}{latest['收盘']}"            # 条件6: 均线多头            hist_df['MA5'] = hist_df['收盘'].rolling(5).mean()            hist_df['MA10'] = hist_df['收盘'].rolling(10).mean()            hist_df['MA20'] = hist_df['收盘'].rolling(20).mean()            ma_condition = (latest['MA5'] > latest['MA10']) & \                          (latest['MA10'] > latest['MA20']) & \                          (hist_df['MA5'].iloc[-1] > hist_df['MA5'].iloc[-2])            conditions['conditions']['均线多头'] = ma_condition            conditions['details']['均线'] = f"MA5={latest['MA5']:.2f}, MA10={latest['MA10']:.2f}, MA20={latest['MA20']:.2f}"            # 条件7: 分时均价线上(使用日线收盘价近似)            conditions['conditions']['分时均价'] = latest['收盘'] > latest['开盘']            conditions['details']['分时价差'] = f"开盘{latest['开盘']}→收盘{latest['收盘']}"            # 条件8: 资金流入            try:                fund_flow = ak.stock_individual_fund_flow(symbol, market="sh").iloc[:3]                fund_condition = (fund_flow['大单净流入'].sum() > 0) & (fund_flow['大单净流入'].iloc[0] > 0)                conditions['conditions']['资金流入'] = fund_condition                conditions['details']['资金流入'] = fund_flow['大单净流入'].tolist()            except Exception as e:                logger.error(f"资金流获取失败: {symbol} - {str(e)}")                conditions['conditions']['资金流入'] = False            logger.debug(f"


    
{symbol} 详细检查结果: {conditions}")            return conditions        except Exception as e:            logger.error(f"处理股票 {symbol} 时出错: {str(e)}", exc_info=True)            return None    def run(self):        selected_stocks = []        with ThreadPoolExecutor(max_workers=Config.MAX_WORKERS) as executor:            futures = {executor.submit(self.analyze_conditions, symbol): symbol for symbol in self.symbol_list[:20]}  # 测试时限制20只                        for future in as_completed(futures):                result = future.result()                if result and all(result['conditions'].values()):                    selected_stocks.append({                        '代码': result['code'],                        '名称': result['name'],                        **result['details']                    })        if selected_stocks:            df = pd.DataFrame(selected_stocks)            print("筛选到符合条件的股票:")            print(df.to_string(index=False))        else:            logger.warning("今日无符合条件股票,分析建议:")            logger.warning("1. 检查各条件阈值是否合理(如成交额范围、换手率要求)")            logger.warning("2. 查看个股详细日志(设置LOG_LEVEL=DEBUG)")            logger.warning("3. 验证数据源是否正常(查看cache目录中的csv文件)")            logger.warning("4. 检查股票流通股本计算是否正确(成交量单位为手)")            logger.warning("5. 确认资金流接口是否有效(可能需更新AKShare库)")

4.主程序

运行程序代码

if __name__ == "__main__":    try:        logger.info("====== 开始选股 ======")        selector = StockSelector()        selector.run()        logger.info("====== 程序执行完成 ======")    except Exception as e:        logger.error(f"程序异常终止: {str(e)}", exc_info=True)


三.运行

运行代码,需要先注意如下几点

1.包版本

Python 3.8+AKShare >= 1.2.0Pandas >= 1.3.0


    

2.数据目录创建

为了避免每次调试程序都需要访问Akshare的api接口读取数据,所以增加了缓存机制,若希望本地缓存更新,删除cache目录下的文件即可强制更新数据,修改start_date和end_date选择历史数据的范围,首次运行代码时,在代码的同级目录创建如下文件夹:

3.日志配置

修改配置中的LOG_LEVEL = logging.DEBUG可以查看每个股票的条件检查细节。

4.运行结果


按照日志信息,需要逐步排查数据及每个条件,看为什么没有选择到标的。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/180095
 
24 次点击