import jsonimport osimport platformimport subprocessimport threadingimport wximport wx.gizmos as gizmosfrom config import DATA_DIR_QUOTES, DATA_TASKSclass StrategyPanel(wx.Panel): def __init__(self, parent): super(StrategyPanel, self).__init__(parent) items = [ ] self.init_ui() def init_ui(self): sizer = wx.BoxSizer(wx.VERTICAL) title = wx.StaticText(self, label="策略管理", style=wx.ALIGN_CENTER) title.SetFont(wx.Font(12, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)) sizer.Add(title, 0, wx.EXPAND | wx.ALL, 5) self.tree = wx.TreeCtrl(self, style= wx.TR_DEFAULT_STYLE | wx.TR_HIDE_ROOT | wx.TR_FULL_ROW_HIGHLIGHT | wx.TR_ROW_LINES | wx.WANTS_CHARS ) root = self.tree.AddRoot("所有策略") self.tree.Bind(wx.EVT_TREE_SEL_CHANGED, self.on_tree_select) self.tree.Bind(wx.EVT_CONTEXT_MENU, self.on_context_menu) self.load_strategies() sizer.Add(self.tree, 1, wx.EXPAND | wx.ALL, 5) btn_sizer = wx.BoxSizer(wx.HORIZONTAL) self.open_btn = wx.Button(self, label="策略目录") self.reload_btn = wx.Button(self, label="重新加载") btn_sizer.Add(self.open_btn, 0, wx.ALIGN_LEFT |wx.RIGHT, 5) btn_sizer.Add(self.reload_btn, 0, wx.ALIGN_LEFT | wx.RIGHT, 5) sizer.Add(btn_sizer, 0, wx.ALIGN_CENTER | wx.TOP | wx.BOTTOM, 10) self.SetSizer(sizer) self.open_btn.Bind(wx.EVT_BUTTON, self.open_task_dir) self.reload_btn.Bind(wx.EVT_BUTTON, self.reload_tasks) def on_context_menu(self, event): print('on_context..') """右键菜单事件处理""" pos = event.GetPosition() item, flags = self.tree.HitTest(pos) if item.IsOk(): self.tree.SelectItem(item) self.selected_item = item menu = wx.Menu() download_item = menu.Append(wx.ID_ANY, "下载策略到本地")
self.Bind(wx.EVT_MENU, self.on_download, download_item) self.tree.PopupMenu(menu) menu.Destroy() def on_download(self, event): """下载菜单项点击事件""" if not hasattr(self, 'selected_item') or not self.selected_item.IsOk(): wx.MessageBox("请先选择一个策略", "错误", wx.OK | wx.ICON_ERROR) return item = self.selected_item data = self.tree.GetItemPyData(item) strategy_id = data['id'] if not strategy_id: wx.MessageBox("无法获取策略ID", "错误", wx.OK | wx.ICON_ERROR) return print(strategy_id) threading.Thread(target=self.fetch_strategy, args=(strategy_id,)).start() def fetch_strategy(self, strategy_id): """执行网络请求并处理响应""" try: from common.api import fetch_strategy data,code = fetch_strategy(strategy_id) if code != 200: wx.CallAfter(wx.MessageBox, data['message'], "下载失败", wx.OK | wx.ICON_ERROR) else: data['id'] = strategy_id wx.CallAfter(self.save_strategy, data) except Exception as e: wx.CallAfter(wx.MessageBox, f"发生异常: {str(e)}", "错误", wx.OK | wx.ICON_ERROR) def save_strategy(self, data): from config import DATA_TASKS from core.task import json_to_task, task_to_toml, dict_to_task try: task = dict_to_task(data) toml_data = task_to_toml(task) path = DATA_TASKS.joinpath(f"{data['name']}_{data['id']}.toml").resolve() with open(path, "w") as f: f.write(toml_data) wx.MessageBox(f"保存{path}成功!", "提示", wx.OK | wx.ICON_INFORMATION) except IOError as e: wx.MessageBox(f"文件保存失败: {str(e)}", "错误", wx.OK | wx.ICON_ERROR) def on_tree_select(self, event): """处理树节点选择事件""" item = event.GetItem() data = self.tree.GetItemPyData(item) if data and 'id' in data.keys(): from config import HOST url = f'{HOST}/strategy/{data["id"]}' self.Parent.Parent.show_page(url,"策略详情") def reload_tasks(self, event): self.load_strategies() wx.MessageBox('重新加载策略列表成功!') def open_task_dir(self, event): from config import DATA_TASKS target_dir = str(DATA_TASKS.resolve())
if not os.path.exists(str(DATA_TASKS.resolve())): wx.MessageBox(f"目录不存在: {target_dir}", "错误", wx.OK | wx.ICON_ERROR) return system = platform.system() try: if system == "Windows": os.startfile(target_dir) elif system == "Darwin": os.system(f'open "{target_dir}"') else: os.system(f'xdg-open "{target_dir}"') except Exception as e: wx.MessageBox(f"无法打开目录: {str(e)}", "错误", wx.OK | wx.ICON_ERROR) def on_item_double_click(self, event): item = event.GetItem() item_data = self.tree.GetItemPyData(item) if item_data and item_data["type"] == "strategy": data = item_data["data"] file_path = DATA_TASKS.joinpath(data['name'] + '.toml') self.open_file_with_default_editor(str(file_path.resolve())) def fetch_all_strategies(self): from common.api import fetch_strategies try: data = fetch_strategies() wx.CallAfter(self.update_all_strategies, data) except (json.JSONDecodeError, ValueError) as e: wx.CallAfter(self.show_error, f"数据错误: {str(e)}") except Exception as e: wx.CallAfter(self.show_error, f"未知错误: {str(e)}") def fetch_my_strategies(self): from common.api import fetch_my_strategies try: data = fetch_my_strategies() wx.CallAfter(self.update_ui, data) except (json.JSONDecodeError, ValueError) as e: wx.CallAfter(self.show_error, f"数据错误: {str(e)}") except Exception as e: wx.CallAfter(self.show_error, f"未知错误: {str(e)}") def update_all_strategies(self, strategies): count_public = 0 count_vip = 0 count_official = 0 count_points = 0 for strategy in strategies: if strategy['is_vip_only'] == True: item = self.tree.AppendItem(self.vip_only_node, strategy["name"]) self.tree.SetItemPyData(item, strategy) count_vip += 1 if strategy['is_official'] == True:
item = self.tree.AppendItem(self.official_node, strategy["name"]) self.tree.SetItemPyData(item, strategy) count_official += 1 if strategy['access_type'] == 'points': item = self.tree.AppendItem(self.points_node, strategy["name"]) self.tree.SetItemPyData(item, strategy) count_points += 1 if strategy['access_type'] == 'public': item = self.tree.AppendItem(self.public_node, strategy["name"]) self.tree.SetItemPyData(item, strategy) count_public += 1 text = self.tree.GetItemText(self.official_node) + f'({count_official})' self.tree.SetItemText(self.official_node, text) text = self.tree.GetItemText(self.vip_only_node) + f'({count_vip})' self.tree.SetItemText(self.vip_only_node, text) text = self.tree.GetItemText(self.public_node) + f'({count_public})' self.tree.SetItemText(self.public_node, text) text = self.tree.GetItemText(self.points_node) + f'({count_points})' self.tree.SetItemText(self.points_node, text) def update_ui(self, strategies): count = len(strategies) text = self.tree.GetItemText(self.my_strategies) + f'({count})' self.tree.SetItemText(self.my_strategies, text) for strategy in strategies: item = self.tree.AppendItem(self.my_strategies, strategy["name"]) self.tree.SetItemPyData(item, strategy) def load_strategies(self): """加载策略到树形列表""" self.tree.DeleteAllItems() root = self.tree.AddRoot("所有策略") self.official_node = self.tree.AppendItem(root, "实验室策略") self.vip_only_node = self.tree.AppendItem(root, "星球会员专属策略") self.public_node = self.tree.AppendItem(root, "公开策略") self.points_node = self.tree.AppendItem(root, "积分查看策略") self.my_strategies = self.tree.AppendItem(root, "我创建的策略") self.local_node = self.tree.AppendItem(root, "本地策略") thread = threading.Thread(target=self.fetch_my_strategies()) thread.start() thread_all = threading.Thread(target=self.fetch_all_strategies()) thread_all.start() self.tree.ExpandAll() def open_file_with_default_editor(self, file_path): """用系统默认编辑器打开文件""" if not os.path.exists(file_path): wx.MessageBox(f"文件不存在: {file_path}", "错误", wx.OK | wx.ICON_ERROR) return try: if platform.system() == "Windows": os.startfile(file_path) elif platform.system() == "Darwin": subprocess.run(["open", file_path]) else: subprocess.run(["xdg-open", file_path]) except Exception as e: wx.MessageBox(f"无法打开文件: {str(e)}", "错误", wx.OK | wx.ICON_ERROR)