import json
import os
import platform
import subprocess
import threading
import wx
import wx.gizmos as gizmos
from config import DATA_DIR_QUOTES, DATA_TASKS
class StrategyPanel(wx.Panel):
def __init__(self, parent):
super(StrategyPanel, self).__init__(parent)
items = [
]
self.init_ui()
def init_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
title = wx.StaticText(self, label="策略管理", style=wx.ALIGN_CENTER)
title.SetFont(wx.Font(12, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD))
sizer.Add(title, 0, wx.EXPAND | wx.ALL, 5)
self.tree = wx.TreeCtrl(self, style=
wx.TR_DEFAULT_STYLE
| wx.TR_HIDE_ROOT
| wx.TR_FULL_ROW_HIGHLIGHT
| wx.TR_ROW_LINES
| wx.WANTS_CHARS
)
root = self.tree.AddRoot("所有策略")
self.tree.Bind(wx.EVT_TREE_SEL_CHANGED, self.on_tree_select)
self.tree.Bind(wx.EVT_CONTEXT_MENU, self.on_context_menu)
self.load_strategies()
sizer.Add(self.tree, 1, wx.EXPAND | wx.ALL, 5)
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.open_btn = wx.Button(self, label="策略目录")
self.reload_btn = wx.Button(self, label="重新加载")
btn_sizer.Add(self.open_btn, 0, wx.ALIGN_LEFT |wx.RIGHT, 5)
btn_sizer.Add(self.reload_btn, 0, wx.ALIGN_LEFT | wx.RIGHT, 5)
sizer.Add(btn_sizer, 0, wx.ALIGN_CENTER | wx.TOP | wx.BOTTOM, 10)
self.SetSizer(sizer)
self.open_btn.Bind(wx.EVT_BUTTON, self.open_task_dir)
self.reload_btn.Bind(wx.EVT_BUTTON, self.reload_tasks)
def on_context_menu(self, event):
print('on_context..')
"""右键菜单事件处理"""
pos = event.GetPosition()
item, flags = self.tree.HitTest(pos)
if item.IsOk():
self.tree.SelectItem(item)
self.selected_item = item
menu = wx.Menu()
download_item = menu.Append(wx.ID_ANY, "下载策略到本地")
self.Bind(wx.EVT_MENU, self.on_download, download_item)
self.tree.PopupMenu(menu)
menu.Destroy()
def on_download(self, event):
"""下载菜单项点击事件"""
if not hasattr(self, 'selected_item') or not self.selected_item.IsOk():
wx.MessageBox("请先选择一个策略", "错误", wx.OK | wx.ICON_ERROR)
return
item = self.selected_item
data = self.tree.GetItemPyData(item)
strategy_id = data['id']
if not strategy_id:
wx.MessageBox("无法获取策略ID", "错误", wx.OK | wx.ICON_ERROR)
return
print(strategy_id)
threading.Thread(target=self.fetch_strategy, args=(strategy_id,)).start()
def fetch_strategy(self, strategy_id):
"""执行网络请求并处理响应"""
try:
from common.api import fetch_strategy
data,code = fetch_strategy(strategy_id)
if code != 200:
wx.CallAfter(wx.MessageBox, data['message'], "下载失败", wx.OK | wx.ICON_ERROR)
else:
data['id'] = strategy_id
wx.CallAfter(self.save_strategy, data)
except Exception as e:
wx.CallAfter(wx.MessageBox, f"发生异常: {str(e)}", "错误", wx.OK | wx.ICON_ERROR)
def save_strategy(self, data):
from config import DATA_TASKS
from core.task import json_to_task, task_to_toml, dict_to_task
try:
task = dict_to_task(data)
toml_data = task_to_toml(task)
path = DATA_TASKS.joinpath(f"{data['name']}_{data['id']}.toml").resolve()
with open(path, "w") as f:
f.write(toml_data)
wx.MessageBox(f"保存{path}成功!", "提示", wx.OK | wx.ICON_INFORMATION)
except IOError as e:
wx.MessageBox(f"文件保存失败: {str(e)}", "错误", wx.OK | wx.ICON_ERROR)
def on_tree_select(self, event):
"""处理树节点选择事件"""
item = event.GetItem()
data = self.tree.GetItemPyData(item)
if data and 'id' in data.keys():
from config import HOST
url = f'{HOST}/strategy/{data["id"]}'
self.Parent.Parent.show_page(url,"策略详情")
def reload_tasks(self, event):
self.load_strategies()
wx.MessageBox('重新加载策略列表成功!')
def open_task_dir(self, event):
from config import DATA_TASKS
target_dir = str(DATA_TASKS.resolve())
if not os.path.exists(str(DATA_TASKS.resolve())):
wx.MessageBox(f"目录不存在: {target_dir}", "错误", wx.OK | wx.ICON_ERROR)
return
system = platform.system()
try:
if system == "Windows":
os.startfile(target_dir)
elif system == "Darwin":
os.system(f'open "{target_dir}"')
else:
os.system(f'xdg-open "{target_dir}"')
except Exception as e:
wx.MessageBox(f"无法打开目录: {str(e)}", "错误", wx.OK | wx.ICON_ERROR)
def on_item_double_click(self, event):
item = event.GetItem()
item_data = self.tree.GetItemPyData(item)
if item_data and item_data["type"] == "strategy":
data = item_data["data"]
file_path = DATA_TASKS.joinpath(data['name'] + '.toml')
self.open_file_with_default_editor(str(file_path.resolve()))
def fetch_all_strategies(self):
from common.api import fetch_strategies
try:
data = fetch_strategies()
wx.CallAfter(self.update_all_strategies, data)
except (json.JSONDecodeError, ValueError) as e:
wx.CallAfter(self.show_error, f"数据错误: {str(e)}")
except Exception as e:
wx.CallAfter(self.show_error, f"未知错误: {str(e)}")
def fetch_my_strategies(self):
from common.api import fetch_my_strategies
try:
data = fetch_my_strategies()
wx.CallAfter(self.update_ui, data)
except (json.JSONDecodeError, ValueError) as e:
wx.CallAfter(self.show_error, f"数据错误: {str(e)}")
except Exception as e:
wx.CallAfter(self.show_error, f"未知错误: {str(e)}")
def update_all_strategies(self, strategies):
count_public = 0
count_vip = 0
count_official = 0
count_points = 0
for strategy in strategies:
if strategy['is_vip_only'] == True:
item = self.tree.AppendItem(self.vip_only_node, strategy["name"])
self.tree.SetItemPyData(item, strategy)
count_vip += 1
if strategy['is_official'] == True:
item = self.tree.AppendItem(self.official_node, strategy["name"])
self.tree.SetItemPyData(item, strategy)
count_official += 1
if strategy['access_type'] == 'points':
item = self.tree.AppendItem(self.points_node, strategy["name"])
self.tree.SetItemPyData(item, strategy)
count_points += 1
if strategy['access_type'] == 'public':
item = self.tree.AppendItem(self.public_node, strategy["name"])
self.tree.SetItemPyData(item, strategy)
count_public += 1
text = self.tree.GetItemText(self.official_node) + f'({count_official})'
self.tree.SetItemText(self.official_node, text)
text = self.tree.GetItemText(self.vip_only_node) + f'({count_vip})'
self.tree.SetItemText(self.vip_only_node, text)
text = self.tree.GetItemText(self.public_node) + f'({count_public})'
self.tree.SetItemText(self.public_node, text)
text = self.tree.GetItemText(self.points_node) + f'({count_points})'
self.tree.SetItemText(self.points_node, text)
def update_ui(self, strategies):
count = len(strategies)
text = self.tree.GetItemText(self.my_strategies) + f'({count})'
self.tree.SetItemText(self.my_strategies, text)
for strategy in strategies:
item = self.tree.AppendItem(self.my_strategies, strategy["name"])
self.tree.SetItemPyData(item, strategy)
def load_strategies(self):
"""加载策略到树形列表"""
self.tree.DeleteAllItems()
root = self.tree.AddRoot("所有策略")
self.official_node = self.tree.AppendItem(root, "实验室策略")
self.vip_only_node = self.tree.AppendItem(root, "星球会员专属策略")
self.public_node = self.tree.AppendItem(root, "公开策略")
self.points_node = self.tree.AppendItem(root, "积分查看策略")
self.my_strategies = self.tree.AppendItem(root, "我创建的策略")
self.local_node = self.tree.AppendItem(root, "本地策略")
thread = threading.Thread(target=self.fetch_my_strategies())
thread.start()
thread_all = threading.Thread(target=self.fetch_all_strategies())
thread_all.start()
self.tree.ExpandAll()
def open_file_with_default_editor(self, file_path):
"""用系统默认编辑器打开文件"""
if not os.path.exists(file_path):
wx.MessageBox(f"文件不存在: {file_path}", "错误", wx.OK | wx.ICON_ERROR)
return
try:
if platform.system() == "Windows":
os.startfile(file_path)
elif platform.system() == "Darwin":
subprocess.run(["open", file_path])
else:
subprocess.run(["xdg-open", file_path])
except Exception as e:
wx.MessageBox(f"无法打开文件: {str(e)}", "错误", wx.OK | wx.ICON_ERROR)