数据结构创建及特点

数据结构创建及特点

#创建列表
import pandas as pd
import numpy as np

# 数据类型:列表、元组、序列、集合、字典、数组、矩阵
list1 = [1,2,3,4,5] # 列表特点:有序、可变、元素可重复
tuple1 = (1,2,3,4,5) # 元组特点:有序、不可变、元素可重复,
series1 = pd.Series(list1) # 序列特点:有序、可变、元素可重复
set1 = {1,2,3,4,5} # 集合特点:无序、不重复、可变
dict1 = {'a':1,'b':(1,3,3,2,4),'c':1} # 字典特点:无序、键唯一,键指的是索引(a),值指的是元素(1)。
df = pd.DataFrame(list1) # 数据框特点:有序、可变、元素可重复
array1 = np.array(list1) # 数组特点:有序、可变、元素可重复
mat1 = np.mat('1 2; 3 4') # 矩阵特点:有序、可变、元素可重复
# print(list1,tuple1,series1,set1,dict1,df,array1,mat1,sep='\n')
set_of_tuples = set([tuple(x) for x in dict1.items()])
# print(set_of_tuples)

常用库及安装和错报解决

库安装及更新:

  1. IDE Python环境/CMD:pip install
    IDE是一种软件应用程序,提供了编程人员编写软件时所需的工具,例如代码编辑器、编译器、调试器等。包括VSCode、PyCharm等。
  2. Jupyter Notebook:!pip install
    加入!,以便.ipnb识别为shell代码,而不是python代码。
# 查看pip版本
pip --version
# 查看库版本
pip show 库名
pip show requests
# 下载指定版本库
pip install scipy==1.13.0
# 更新指定库
pip install --upgrade 库名
# 查看当前电脑中所有可以升级的Python包
pip list --outdated
# 安装pip-review包
pip install pip-review
# 使用pip-review命令一次性更新所有过时的包
pip-review --local --interactive

错报解决:

  1. Visual Studio生成工具2022用于配置C++环境,生成VSwhere.exe,解决metadata或者VSwhere.exe缺失错误。

常用库介绍:

  1. Excel读写:
    pandas、(openpyxl、xlsxwriter支持新版.xlsx)、(xlrd、xlwt支持旧版.xls读写,不支持新版.xlsx)
    to_excel方法默认使用openpyxl作为写入引擎来处理.xlsx文件。通过engine=”xlsxwirter”切换引擎。r转义。
  2. with as用法:
    with pd.excelwriter('PATH',engine=''):
    with表打开文件,语句结束时关闭文件。as输出with对象简称。通常用于打开或关闭文件的操作,如打开excel写入。
  3. os库:
    os.listdir(os.getcwd)#获取目录os.rename#改名for root, dirs, files in os.walk(file_dir)#获取路径
  4. Pending:beautifulsoup、Numpy、Pandas、urllib、sklearn、matplotlib、sys、Pyqt5、selenium、tkinter、datetime、win32ui。

Python CheatSheet

matplotlib、matplotlib.pyplot

CheatSheet

Pandas

CheatSheet

Numpy

CheatSheet

Python内置变量说明

类(Class),对象(Object)、方法(Method)

1. Python中所有东西都是对象,只是看从属关系,比如类的实例(对象)。字段(Field)通常指的是类或对象的属性,如下方的name和age。

2. 类(Class),对象(Object)、方法(Method)一个类能够创建以中新的类型(Type),其中对象就是类的实例(Instance)。即,创建类的对象后,对象中的数据可以调用类的方法。类的方法又称为类的函数。字段(自己称为初始属性)与方法通称为类的属性(Attribute)。字段有my_list = [1, 2, 3],其中list为类,my_list为list的实例(对象),“1,2,3”为my_list的元素。

3. 类的结构为:
class Person:
population = 0 #**此为类变量,其余均为对象变量**
def __init__(self, name, age):
self.name = name
self.age = age

def introduce(self, other):
print(f"Hello, {other}, my name is {self.name} and I'm {self.age} years old.")
p = Person('Alice', 25)
p.introduce('Bob')
# 输出:Hello, Bob, my name is Alice and I'm 25 years old.

4. __init__为初始化类的属性,该方法会在类的对象被实例化时立即运行,将两个参数name和age的值赋给self.name和self.age。

5. 当使用对象调用方法时,对象本身的参数self会自动传递给实例P,所以p.introduce('Bob')不需要写为p.introduce(self,'Bob')。而且,Python实际上是在后台调用Person.introduce(p,'Bob')。如果手动传递self参数,会报错,系统认为传递了更多的参数。但是如果方法没有参数,那么必须写入self参数。

6. 可以给类属性和类方法设置默认参数,或者给方法增加参数。但是传递给对象的时候,不能漏写一个参数,否则会报错。即实例属性/实例变量(Instance Variable)和类属性/变量(Class Variables)

7. 之所以不直接用 a = Person.introduce() 来引用,有两个原因:
a. 本来用类调用方法就会触发错误,类必须先传递给对象(传递给对象(即实例)后,对象就有了类的属性以及方法,通过对象调用方法)
b. 直接通过类设置属性,可以使得类下面的方法都能使用相同的参数。不过。静态方法和类方法不需要实例,它们可以直接通过类来调用。静态方法使用@staticmethod装饰器定义,类方法使用@classmethod装饰器定义。
class MyClass:@staticmethod
def my_static_method():
print("This is a static method.")

@classmethod
def my_class_method(cls):
print("This is a class method.")

MyClass.my_static_method() # 输出:This is a static method.
MyClass.my_class_method() # 输出:This is a class method.
8. numpy是模块,numpy.array是工厂函数,numpy.ndarray是类,如:a = numpy.array([1,2,3]),其中a为变量也是numpy.ndarray类的对象/实例,numpy.array作为模块中的函数,给numpy.ndarray类创建对象。因为函数(工厂函数)能够输入不同类型的数据,而类若需要为不同类型的输入数据创建对象,会使得类有更为复杂的初始化过程。
class Person:
pass
p = Person()
print(p)

if __name__ == ‘__main__‘:

if __name__ == '__main__':
main()
function()
print('')
#__name__和__main__都是内置变量,当文件被直接执行时,__name__的值就是__main__,文件中代码均被执行。
#当文件作为模块被导入时,__name__的值就是文件名(),`if`下的内容均不会被执行。
#该结构常用于测试文件中的代码,即`if`下为测试代码。如此,可以将测试代码一同保留在文件中,且被导入时不会执行测试代码。

Matplotlib应用

坐标轴绘制

import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 定义增函数的参数
a = 0.5 # a为增函数的斜率
Rf = 6 # Rf为y轴交点的值

# 创建图形和坐标系
fig, ax = plt.subplots(figsize=(6, 6))

# 绘制箭头
ax.arrow(0, 0, 10, 0, head_width=0.2, head_length=0.3, fc='black', ec='black')
ax.arrow(0, 0, 0, 10, head_width=0.2, head_length=0.3, fc='black', ec='black')

# 添加轴标题
ax.text(10, -0.5, 'β', fontsize=12, ha='center')
ax.text(-0.5, 10, 'E(R)', fontsize=12, va='center', rotation='vertical')

# 隐藏刻度
ax.set_xticks([])
ax.set_yticks([])

# 隐藏边框线条
for spine in ax.spines.values():
spine.set_visible(False)

# 绘制增函数
x = np.linspace(0, 10, 100)
y = a * x + Rf
plt.plot(x, y, color='black', linestyle='-', linewidth=2)

# 在线段中间标记红色点,并写上点的名字
mid_point_x = 5 # x轴坐标
mid_point_y = a * mid_point_x + Rf # y轴坐标
plt.plot(mid_point_x, mid_point_y, 'ro') # 标记红色点
ax.text(mid_point_x, mid_point_y + 0.5, '基金P', fontsize=12, color='red', ha='center') # 在点上方写上点的名字

# 在y轴交点处标上Rf
ax.plot(0, Rf, 'ro') # 标记交点
ax.text(-0.5, Rf, 'Rf', fontsize=12, va='center', ha='right') # 添加标签

# 画从Rf向右延申的蓝色线
plt.plot([0, mid_point_x], [Rf, Rf], color='blue', linestyle='--')

# 画从基金P开始的垂直于x轴的蓝色线,终点设置为与水平蓝色线相交的位置
plt.plot([mid_point_x, mid_point_x], [mid_point_y, Rf], color='blue', linestyle='--')

# 显示图形
plt.show()

绘制带有LaTex的坐标图

import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['text.usetex'] = True
plt.rcParams['text.latex.preamble'] = r'\usepackage{CJK}'
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 定义增函数的参数
a = 0.5 # a为增函数的斜率
Rf = 6 # Rf为y轴交点的值

# 创建图形和坐标系
fig, ax = plt.subplots(figsize=(6, 6))

# 绘制箭头
ax.arrow(0, 0, 10, 0, head_width=0.2, head_length=0.3, fc='black', ec='black')
ax.arrow(0, 0, 0, 10, head_width=0.2, head_length=0.3, fc='black', ec='black')

# 添加轴标题
ax.text(10, -0.5, r'$\beta$', fontsize=12, ha='center')
ax.text(-0.5, 10, r'$E(R)$', fontsize=12, va='center', rotation='vertical')

# 隐藏刻度
ax.set_xticks([])
ax.set_yticks([])

# 隐藏边框线条
for spine in ax.spines.values():
spine.set_visible(False)

# 绘制增函数
x = np.linspace(0, 10, 100)
y = a * x + Rf
plt.plot(x, y, color='black', linestyle='-', linewidth=2)

# 在线段中间标记红色点,并写上点的名字
mid_point_x = 5 # x轴坐标
mid_point_y = a * mid_point_x + Rf # y轴坐标
plt.plot(mid_point_x, mid_point_y, 'ro') # 标记红色点
ax.text(mid_point_x, mid_point_y + 0.5, r'\begin{CJK}{UTF8}{song}基金P\end{CJK}', fontsize=12, color='red', ha='center') # 在点上方写上点的名字

# 在y轴交点处标上Rf
ax.plot(0, Rf, 'ro') # 标记交点
ax.text(-0.5, Rf, 'Rf', fontsize=12, va='center', ha='right') # 添加标签

# 画从Rf向右延申的蓝色线
plt.plot([0, mid_point_x], [Rf, Rf], color='blue', linestyle='--')

# 画从基金P开始的垂直于x轴的蓝色线,终点设置为与水平蓝色线相交的位置
plt.plot([mid_point_x, mid_point_x], [mid_point_y, Rf], color='blue', linestyle='--')

# 在坐标右侧添加一个文本框
text_box_content = r'\begin{CJK}{UTF8}{song}斜率$=\frac{\overline{R_p}-\overline{R_f}}{\beta_p}$\end{CJK}'+ r'$\\$'+r'\begin{CJK}{UTF8}{song}证券市场线上任一个组合的斜率即为特雷诺比率,\\即单位系统风险下的超额收益率\end{CJK}'

plt.text(3, 5, text_box_content, fontsize=12, bbox=dict(facecolor='white', alpha=0.5), va='top', ha='left', usetex=True)
#verticle align\herizon align

# 显示图形
plt.show()

OS应用

Python文件打包

import subprocess
import shutil
import os
import tkinter as tk
from tkinter import filedialog
from tkinter import messagebox

def select_file():
# 弹出文件选择框
filename = filedialog.askopenfilename()
# filename = input('请输入要打包脚本的文件名:')
# 打包参数:--onefile 生成单个可执行文件 --noconfirm 不询问直接打包 --distpath 打包后文件的存放路径,由于是 shell命令,所以用空格分隔,不用冒号
subprocess.run(['pyinstaller', '--onefile', '--noconfirm', '--distpath', './', filename])
shutil.rmtree('./build')
# messagebox.showinfo(f'./{filename.split(".")[0]}.spec')
# os.remove('./修改文件名-汇总文件名.spec')
# if os.path.exists(f'./{filename.split(".")[0]}.spec'):
# messagebox.showinfo(f'./{filename.split(".")[0]}.spec')
# os.remove(f'./{filename.split(".")[0]}.spec')
root.destroy() # 关闭窗口

root = tk.Tk()
button = tk.Button(root, text="Select file", command=select_file)
button.pack()
root.mainloop()

文件压缩

import os
import time
source = ['D:\Develop_Blog\source\Repository\Python数据分析\Jungle.py']
target_dir = 'D:\Develop_Blog\source\Repository\Python数据分析\Backup'
# target = target_dir + os.sep+time.strftime('%Y%m%d%H%M%S')+'.zip'
if not os.path.exists(target_dir):
os.mkdir(target_dir)
today = target_dir + os.sep+time.strftime('%Y%m%d')
now = time.strftime('%H%M%S')
comment = input('Enter a comment -->')
if len(comment) == 0:
target = today+os.sep+now+'.zip'
else:
target = today+os.sep+now+'_'+comment.replace(' ','_')+'.zip'
if not os.path.exists(today):
os.mkdir(today)
print('Successfully created directory',today)
zip_command = 'zip -r {0} {1}'.format(target,' '.join(source))
print('Zip command is:')
print(zip_command)
print('Running:')
if os.system(zip_command) == 0:
print('Successful backup to',target)
else:
print('Backup FAILED')

修改汇总文件名

import os
import tkinter
import pandas as pd
import numpy as np
from tkinter import filedialog, simpledialog

# 初始化文件夹
blank_close = tkinter.Tk()
blank_close.withdraw()

# 弹出文件夹选择框
folder_path = filedialog.askdirectory()

# 读取文件夹下的文件名
filename = os.listdir(folder_path)

# 汇总文件名
def summarize_filename(filename):
df = pd.DataFrame(np.row_stack(filename))
excel_name = simpledialog.askstring("输入框","请输入要保存的Excel文件名: ", parent=blank_close)
df.to_excel(f'./{excel_name}.xlsx', header=False, index=False, sheet_name='Sheet1')

# 初始化计数器
total_files = len(filename)
modified_files = 0
unmodified_files = 0

# 遍历文件夹中的所有文件并更改文件名
def rename_file():
if filename:
for file in filename:
if file.endswith('.pdf'):
file_path = os.path.join(folder_path, file)
base_name = os.path.splitext(file)[0]
# 更改文件名
new_filename = base_name + '.pptx'
new_file_path = os.path.join(folder_path, new_filename)
try:
os.rename(file_path, new_file_path)
# 拼接文件路径
# print("File renamed from " + file + " to " + new_filename) # 通过+号拼接字符串,其中 变量须为字符串类型,即variable = str
# print("File renamed from {} to {}".format(file, new_filename)) # 通过.format()拼接 字符串,其中变量可以为任意类型
# 或:print("File renamed from {0} to {1}".format(file, new_filename))
print(f'File renamed from {file} to {new_filename}')
modified_files += 1
except Exception as e:
# 打印错误信息
print(f'Error renaming file {file}: {str(e)}')
else:
unmodified_files += 1
else:
print("Selected folder is empty.")

# 打印统计结果
print(f"Total files: {total_files}")
print(f"Modified files: {modified_files}")
print(f"Unmodified files: {unmodified_files}")

if __name__=='__main__':
summarize_filename(filename)
rename_file()
print("File renaming process completed.")

项目应用

爬虫应用-01

# -*- coding: utf-8 -*-
import datetime
import sys
import tkinter
from tkinter import messagebox
blank_close = tkinter.Tk()
blank_close.withdraw()
messagebox.showinfo('提示','确认后导入《律所清单》Excel文件')

from retrying import retry
start_time = datetime.datetime.now()
'''律所网站爬取测试'''
from selenium import webdriver
'''浏览器设置变量--无界面模式'''
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
browser = webdriver.Chrome(options=chrome_options)
chrome_options.page_load_strategy = 'none'
from selenium.webdriver.common.by import By
import pandas as pd
from pandas.core.frame import DataFrame
from openpyxl import workbook
from openpyxl import load_workbook

'''打开文件文本框'''
import win32ui
import os
dlg = win32ui.CreateFileDialog(1)
dlg.SetOFNInitialDir('D:\'')
dlg.DoModal()
filename = dlg.GetPathName()
save_root = os.path.split(filename)
save_root = save_root[0]+'\\'
# print(save_root)
# print(filename[:filename.rfind(".")])
LawFirm = pd.read_excel(r''+filename+'')
'''去除空格'''
LawFirm = LawFirm.applymap(lambda x:x.strip() if type(x)==str else x)
df = LawFirm.values.tolist()
Info_dict = dict(df)
# print(Info_dict['北京市金杜(广州)律师事务所'])
dictionary = {'姓名':[],'性别':[],'民族':[],'执业机构:':[],'执业证号':[],'律师(法律职业)资格证号':[],'执业时间':[],'毕业院校':[],'所读专业':[],'最高学历':[],'取得学历时间':[],'取得学位时间':[],'最高学位':[],'本年度注册情况':[],'链接':[]}
# @retry(wait_fixed=10, stop_max_attempt_number=1)
Test_env = input('是否测试:是/否')
for q in Info_dict.keys():
address = Info_dict[q]
# browser = webdriver.Chrome()
browser.get(address)
xpath = "//div[@class='infodetail']/table/tbody/tr[8]/td[3]/a"
url_pre = browser.find_elements()
url_pre = browser.find_elements(By.XPATH,xpath)
urllist = []
for i in url_pre:
url = i.get_attribute("href")
urllist.append(url)
# print(urllist)
# print(len(urllist))

'''律所人员网页爬取测试'''
import pandas as pd
import numpy as np
from selenium import webdriver
from selenium.webdriver.common.by import By
# browser = webdriver.Chrome()
namelist = ['姓名','性别','民族','执业机构:','执业证号','律师(法律职业)资格证号','执业时间','毕业院校','所读专业','最高学历','取得学历时间','取得学位时间','最高学位','本年度注册情况']
# dictionary = {'姓名':[],'性别':[],'民族':[],'执业机构:':[],'执业证号':[],'律师(法律职业)资格证号':[],'执业时间':[],'毕业院校':[],'所读专业':[],'最高学历':[],'取得学历时间':[],'取得学位时间':[],'最高学位':[],'本年度注册情况':[]}
# dictionary = dict.fromkeys(namelist)
# print(type(dictionary['姓名']))
number = 0
y = 0
loop = 0
pool_1 = {'1':6,'2':4,'3':4,'4':4,'5':4,'6':4,'7':1}
# pool_2 = {'2':'姓名','4':'性别','6':'民族','2':'执业机构','4':'执业证号','2':'律师证号','4':'执业时间','2':'毕业院校','4':'所读专业','2':'最高学历','4':'最高学位','2':'取得学位时间','4':'取得学历时间','2':'本年度注册情况'}
pool_2 = {'1':{2:'姓名',4:'性别',6:'民族'},'2':{2:'执业机构:',4:'执业证号'},'3':{2:'律师(法律职业)资格证号',4:'执业时间'},'4':{2:'毕业院校',4:'所读专业'},'5':{2:'最高学历',4:'取得学位时间'},'6':{2:'最高学位',4:'取得学历时间'},'7':{2:'本年度注册情况'}}
# TEST = pool_2.get('1',{}).get(2)
# print(dictionary[TEST])
for i in urllist:
browser.get(i)
for p in range(1,8):
for y in range(pool_1.get(str(p))+1):
xpath = '//div[@class="infodetail"]/table[1]/tbody/tr[' + str(p) + ']/td[' + str(y) + ']'
url_pre = browser.find_elements(By.XPATH,xpath)
for t in url_pre:
name = t.text
if name not in namelist:
if len(name) != 0:
pool_track = pool_2.get(str(p),{}).get(y)
# dictionary.get(pool_track).append(name)
dictionary[pool_track].append(name)
# pool_add = {pool_track:name}
# dictionary.update(pool_add)
# dictionary.update({'链接':i})
# print(pool_track,name,pool_track,p,y)
else:
name = None
pool_track = pool_2.get(str(p),{}).get(y)
dictionary[pool_track].append(name)
else:
None
dictionary['链接'].append(i)
if Test_env == "是":
number = number + 1
if number == 1:
break
df = pd.DataFrame.from_dict(dictionary,orient='index')
df = df.T
# print(dictionary)
# print(df)
browser.quit()

message = input('文件名: 如:律所遍历 ' )
df.to_excel(save_root + message +'.xlsx', sheet_name='律所遍历', header=1, index=False)
# message = input('是否覆盖原文档: 是/否 ')
# if message == '是':
# df.to_excel(r'' + save_root + '律所遍历.xlsx', sheet_name='律所遍历', header=1, index=False)
# df.to_excel(filename, sheet_name='律所遍历', header=1, index=False)
# else:
# df.to_excel(r'' + save_root + '律所遍历.xlsx', sheet_name='律所遍历', header=1, index=False)
end_time = datetime.datetime.now()
from tkinter import messagebox
blank_close = tkinter.Tk()
blank_close.withdraw()
messagebox.showinfo('程序完成用时',end_time - start_time)
# print('程序完成用时',end_time - start_time)
# browser.close()
apply = QApplication(sys.argv)
self.widget

量化应用-01

#获取基本数据
def get_data (ID,ID_name):
df_getdata = w.edb(ID,"20091001","20211031","Period=M;Days=Alldays;Fill=Previous",usedf=True)[1]
df_getdata.columns = ID_name
return df_getdata
#计算指标的同比
def df_yoy(ID_code):
yoy_code = [dictionary[i] for i in ID_code]
df_yoy = w.edb(yoy_code,"20091001","20211031","Period=M;Days=Alldays;Fill=Previous", usedf = True)[1]
df_yoy.columns = ID_code
for i in range(len(yoy_code)):
df_yoy[ID_code[i]+'同比'] = df_yoy[[ID_code[i]]].pct_change(periods=12)
del df_yoy[ID_code[i]]
return df_yoy
#获取收益率序列
def df_return(industry):
df_return = w.wsd(industry,"close","20090901","20211231","Period=M;Days=Alldays;Fill=Previous",usedf=True)[1]
df_return.columns=['return']
df_return = df_return.dropna()
df_return['return_同比'] = df_return['return'].pct_change(periods = 1)
del df_return['return']
df_return = df_return.dropna()
return df_return
#计算分位数
def rank_pct(alist):
alist_sort = list(np.sort(alist))
result = [alist_sort.index(i)/(len(alist)-1) for i in alist]
return result
#ty = 1 为连续上涨,ty = 0为连续下跌
def compare(alist,ty):
s=0
if ty == 1:
for i in range(len(alist)-1):
if alist[i]>=alist[i+1]:
s = s+1
elif ty == 0:
for i in range(len(alist)-1):
if alist[i]<= alist[i+1]:
s=s+1
return s
#计算指标的信号
def event(alist):
K1,K2,K3,K4,K5 = [],[],[],[],[]
#获取序列的分位数
alist_drop = pd.DataFrame(alist).dropna().iloc[:,0].tolist()
alist_drop_chg = pd.DataFrame(alist).dropna().pct_change(periods=1).dropna().iloc[:,0].tolist()
if len(alist_drop) == len(alist):
rank = rank_pct(alist)
elif len(alist_drop) != len(alist):
rank = [0.5 for i in range (len(alist)-len(alist_drop))]+rank_pct(alist_drop)
rank_chg = [0.5 for i in range (len(alist)-len(alist_drop_chg))]+rank_pct(alist_drop_chg)
for i in range(len(alist)):
#获得信号K1
if rank[i]< 0.3:
k1=-1
elif 0.3<=rank[i]<=0.7:
k1=0
elif rank[i]>0.7:
k1=1
K1.append(k1)
#获得信号K2
if i == 0:
k2=0
elif np.isnan(alist[i-1]):
k2=0
elif alist[i]-alist[i-1]>0:
k2=1
elif alist[i]-alist[i-1]<0:
k2=-1
elif alist[i] == alist[i-1] == 0:
k2=0
K2.append(k2)
#获得信号K3
if i<=3:
k3=0
elif np.isnan(alist[i-3]):
k3=0
elif compare(alist[i-3:i+1],1) == 0:#因子值连续上涨3个月
k3=1
elif compare(alist[i-3:i+1],0) == 0:#因子值连续下跌3个月
k3=-1
else:
k3=0
K3.append(k3)
#获得信号K4
if i<=3:
k4=0
elif np.isnan(alist[i-3]):
k4=0
elif compare(alist[i-3-1:i],1) == 0 and alist[i]<alist[i-1]:#因子值连续上涨3个月后最新一期下跌
k4=-1
elif compare(alist[i-3-1:i],0) == 0 and alist[i]>alist[i-1]:#因子值连续下跌3个月后最新一期上涨
k4=1
else:
k4=0
K4.append(k4)
#获得信号K5
if rank_chg[i]>0.8:
k5=1
elif rank_chg[i]<0.2:
k5=-1
elif 0.2<= rank_chg[i]<=0.8:
k5=0
K5.append(k5)
return K1,K2,K3,K4,K5
#回测结果函数
def result_fun(return_list,rate_list):
result = []
i = np.argmax((np.maximum.accumulate(return_list)-return_list)/np.maximum.accumulate(return_list))
if i == 0:
return pd.DataFrame()
j = np.argmax(return_list[:i])
MaxDrawdown = -(return_list[j]-return_list[i])/return_list[j]
return_year = pow(return_list[-1],12/len(return_list))-1
volatility = np.std(np.array(rate_list)-1)*pow(12,0.5)
result.append(return_list[-1])
result.append(return_year*100)
result.append(volatility*100)
result.append(MaxDrawdown*100)
result.append(result[1]/result[2])
result=pd.DataFrame(result).T
result.columns = ['净值','年化收益率','年化波动率','最大回撤','夏普比率']
return result
#获取行业数据
from WindPy import w
import pandas as pd
import numpy as np
from tqdm import trange
w.start()
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)
ID_name = '社会消费品零售总额:当月同比,消费者信心指数(月),CPI:当月同比,36大中城市日用工业消费品平均价格:空调机:1.5匹冷暖,36大中城市日用工业消费品平均价格:洗衣机:波轮式(5KG),36大中城市日用工业消费品平均价格:洗衣机:滚筒式(5KG),36大中城市日用工业消费品平均价格:冰箱:210-250立升,长江有色市场:平均价:铜:1,长江有色市场:平均价:铝:A00,价格:冷轧板卷:1.0mm:北京,价格:冷轧板卷:1.0mm:上海,价格:冷轧板卷:1.0mm:广州,中国塑料城价格指数,产量:空调:当月同比,产量:空调:累计同比,产量:家用洗衣机:当月同比,产量:家用洗衣机:累计同比,产量:家用电冰箱:当月同比,产量:家用电冰箱:累计同比,房屋竣工面积:累计同比,(停止)库存:家用空调:当月同比,(停止)库存:冰箱:当月同比,(停止)库存:洗衣机:当月同比,(停止)销量:家用空调:当月同比,(停止)内销量:家用空调:当月同比,(停止)销量:家用空调:累计同比,(停止)内销量:家用空调:累计同比,(停止)销量:冰箱:当月同比,(停止)内销量:冰箱:当月同比,(停止)销量:冰箱:累计同比,(停止)内销量:冰箱:累计同比,(停止)销量:洗衣机:当月同比,(停止)内销量:洗衣机:当月同比,(停止)销量:洗衣机:累计同比,(停止)内销量:洗衣机:累计同比'.split(',')
ID_code = 'M0001428,M0012303,M0000612,S6805558,S6805550,S6805551,S6805559,S0182161,S0182162,S0033141,S0033155,S0033145,S5431605,S0028203,S0028205,S0028211,S0028213,S0028207,S0028209,S0073297,S5616246,S5616354,S5616429,S5616250,S5616256,S5616254,S5616260,S5616358,S5616364,S5616362,S5616368,S5616433,S5616439,S5616437,S5616443'.split(',')
dictionary= dict(zip(ID_name,ID_code))
industry ='801111.SI'#白电行业
# len(ID_name)

df=get_data(ID_code,ID_name)
df_m = df[['长江有色市场:平均价:铜:1','长江有色市场:平均价:铝:A00','价格:冷轧板卷:1.0mm:北京','价格:冷轧板卷:1.0mm:上海','价格:冷轧板卷:1.0mm:广州','中国塑料城价格指数']]
df_m=df_m.pct_change(periods=12).dropna()+1
df_m['原材料价格指数']=(df_m['长江有色市场:平均价:铜:1']+df_m['长江有色市场:平均价:铝:A00']+(df_m['价格:冷轧板卷:1.0mm:北京']+df_m['价格:冷轧板卷:1.0mm:上海']+df_m['价格:冷轧板卷:1.0mm:广州'])/3+df_m['中国塑料城价格指数'])/4
df=df.iloc[:,0:7].join(df.iloc[:,13:])
df=df.join(df_m[['原材料价格指数']])
df=df.join(df_yoy(['36大中城市日用工业消费品平均价格:空调机:1.5匹冷暖','36大中城市日用工业消费品平均价格:洗衣机:波轮式(5KG)','36大中城市日用工业消费品平均价格:洗衣机:滚筒式(5KG)','36大中城市日用工业消费品平均价格:冰箱:210-250立升'])*100)
df=df.iloc[:,0:3].join(df.iloc[:,7:])

#收益率数据向后平移两期
df = df_return(industry).dropna().shift(-2).join(df).iloc[:-2,:]
# df
#简单逻辑演绎回测
a=df['(停止)销量:家用空调:当月同比'].tolist()
b=df['(停止)销量:冰箱:当月同比'].tolist()
c=df['(停止)销量:洗衣机:当月同比'].tolist()
d=df['原材料价格指数'].tolist()

#空调、冰箱、洗衣机的销售额大致为2:1:1
signal=0.8*np.array(pd.DataFrame(event(a)).sum().tolist())-0.2*np.array(pd.DataFrame(event(d)).sum().tolist())

index_rate = df['return_同比'].tolist()
rate_me=[]
rate_index=[]

# print(type(int(rate)))
for i in range(len(signal)):
if signal[i]>0:
rate = 1+index_rate[i]
elif signal[i]<0:
rate= 1
rate_me.append(rate)
rate_index.append(1+index_rate[i])
value_me = [1]
value_index = [1]
for i in range(len(rate_me)):
value_me.append(value_me[i]*rate_me[i])
value_index.append(value_index[i]*rate_index[i])

import matplotlib.pyplot as plt
plt.style.use('ggplot')
plt.figure(figsize=(8,4),dpi=1000)
plt.rcParams['font.sans-serif']=['SimHei']
plt.rcParams['axes.unicode_minus']=False
plt.xlabel('时间')
plt.ylabel('净值')
plt.plot(pd.DataFrame(value_me,index = pd.date_range('2009-11-30',periods=len(value_me),freq='M')),label = '择时策略')
plt.plot(pd.DataFrame(value_index,index = pd.date_range('2009-11-30',periods=len(value_me),freq='M')),label = '基准指数')
plt.legend()

df_result = pd.concat([result_fun(value_me,rate_me),result_fun(value_index,rate_index)])
df_result.index = ['简单逻辑','白电指数']
# df_result
# df.shape
#择时策略回测
index,win_rate,count,profit_loss,record = [],[],[],[],[]
for i in trange(1,df.shape[1]):
for j in range(5):
if df.columns[i] != '原材料价格指数':
df_check = pd.concat([pd.DataFrame(event(df.iloc[:,i].tolist()[:87])[j]),pd.DataFrame(df['return_同比'].tolist()[:87])],axis=1)
elif df.columns[i] == '原材料价格指数':
df_check = pd.concat([pd.DataFrame(-1*np.array(event(df.iloc[:,i].tolist()[:87])[j])),pd.DataFrame(df['return_同比'].tolist()[:87])],axis=1)
df_check.columns = ['signal','rate']
df_check['win'] = df_check['rate'].apply(lambda x: 1 if x>0 else 0)
#计算胜率
try:
win_rate.append(df_check.groupby('signal').agg('mean')['win'][1])
except:
win_rate.append(np.nan)
#计算历史发生次数
try:#如:第一个指标的所有行,转为list,取前87个,代入event函数,获得五种信号,取第j种信号,转为dataframe,统计值为1的个数。即每个指标在前87期发生的次数。总共30个指标,每个指标5个信号,共产生150个信号。df.iloc[:,i].tolist()[:87]即取每个指标的前87期数据,计算得到5种信号。
# count.append(pd.DataFrame(event(df.iloc[:,i].tolist()[:87])[j]).value_counts()[1])
count.append(pd.Series(pd.DataFrame(event(df.iloc[:,i].tolist()[:87])[j]).values.flatten()).value_counts()[1])
except:
count.append(np.nan)
#计算盈亏比
try:
profit_loss.append(abs(df_check[df_check['signal'] == 1].groupby('win').agg('mean')['rate'].pct_change(periods = 1)[1]+1))
except:
profit_loss.append(np.nan)

index_name = df.columns[i]
if j == 0:
index.append(index_name+';处于历史较高位置')
elif j == 1:
index.append(index_name + ';本月较上月变动大于0')
elif j == 2:
index.append(index_name + ';连续三个月上涨')
elif j== 3:
index.append(index_name + ';连续三个月下跌后上涨')
elif j== 4:
index.append(index_name + ';月涨幅处于历史80%以上')
record.append([i,j])
df_event = pd.concat([pd.DataFrame(index),pd.DataFrame(count),pd.DataFrame(win_rate),pd.DataFrame(profit_loss),pd.DataFrame(record)],axis=1)
df_event.columns = ['信号名称','历史触发次数','胜率','盈亏比','指标索引','信号索引']
df_event_effect = df_event[(df_event['历史触发次数'] > 7) & (df_event['胜率'] > 0.7) & (df_event['盈亏比'] > 1.5) ]
df_event
df_event_effect
index_id = df_event_effect['指标索引'].tolist()
sig = df_event_effect['信号索引'].tolist()
sig_all = []
for i in range(len(index_id)):
if index_id[i] == 26:
sig_all.append(-1*np.array(event(df.iloc[:,index_id[i]].tolist()[87:])[sig[i]]))
elif index_id[i] != 26:
sig_all.append(event(df.iloc[:, index_id[i]].tolist()[87:])[sig[i]])
signal = pd.DataFrame(sig_all).replace(-1,0).sum().tolist()
# signal
index_rate = df['return_同比'].tolist()[87:]
rate_me = []
rate_index = []
rate = 0

for i in range(len(signal)):
if signal[i] > 0:
rate = 1 + index_rate[i]
elif signal[i] == 0:
rate = 1
rate_me.append(rate)
rate_index.append(1 + index_rate[i])
value_me = [1]
value_index = [1]
for i in range(len(rate_me)):
value_me.append(value_me[i]*rate_me[i])
value_index.append(value_index[i]*rate_index[i])

import matplotlib.pyplot as plt
plt.figure(figsize=(8,4),dpi=1000)
plt.rcParams['font.sans-serif']=['SimHei']
plt.rcParams['axes.unicode_minus']=False
plt.xlabel('时间')
plt.ylabel('净值')
plt.plot(pd.DataFrame(value_me,index = pd.date_range('2017-03-31',periods=len(value_me),freq='M')),label = '择时策略')
plt.plot(pd.DataFrame(value_index,index = pd.date_range('2017-03-31',periods=len(value_me),freq='M')),label = '基准指数')
plt.legend()
df_result = pd.concat([result_fun(value_me,rate_me),result_fun(value_index,rate_index)])
df_result.Index = ['择时策略','白电指数']
df_result

Var三种计量方法的Python应用

参数法

# 设置观察时间以及测试标的
import numpy as np
import yfinance as yf
import pandas as pd
import datetime as dt
import matplotlib.pyplot as plt
from scipy.stats import norm
# data = yf.download("JPM", start="2023-10-02", end="2023-10-31")
years = 15
endDate = dt.datetime.now()
startDate = endDate - dt.timedelta(days = 365*years)
tickers = ['SPY', 'BND', 'GLD', 'QQQ', 'VTI']

# 下载数据并取对数
adj_close_df = pd.DataFrame()
for ticker in tickers:
data = yf.download(ticker, start=startDate, end=endDate)
adj_close_df[ticker] = data['Adj Close']
print(adj_close_df)
log_returns = np.log(adj_close_df / adj_close_df.shift(1))
log_returns = log_returns.dropna()

# 设置权重和组合收益周期并计算组合收益
portfolio_value = 1000000
weights = np.array([1/len(tickers)] * len(tickers))
historical_returns = (log_returns * weights).sum(axis=1)
days = 5
historical_x_day_returns = historical_returns.rolling(window=days).sum()

# 建立协方差矩阵并计算组合标准差
cov_matrix = log_returns.cov() * 252
portfolio_std_dev = np.sqrt(weights.T @ cov_matrix @ weights)

# 计算不同置信区间的VaR并输出
from scipy.stats import norm
confidence_levels = [0.90, 0.95, 0.99]
VaRs = []
for cl in confidence_levels:
VaR = portfolio_value * (norm.ppf(1 - cl) * portfolio_std_dev * np.sqrt(days / 252) - historical_returns.mean() * days)
VaRs.append(VaR)

print(f'{"Confidence Level":<20} {"Value at Risk":<20}')
print('-' * 40)
for cl, VaR in zip(confidence_levels, VaRs):
print(f'{cl * 100:>6.0f}%: {"":<8} ${VaR:>10,.2f}')

# 绘制组合回报分布图以及VaR分位点
# 根据投资组合价值取得实际回报金额
historical_x_day_returns_dollar = historical_x_day_returns * portfolio_value
# 绘制分布图
plt.hist(historical_x_day_returns_dollar, bins=50, density=True, alpha=0.5, label=f'{days}-Day Returns')
# 绘制分位点直线
for cl, VaR in zip(confidence_levels, VaRs):
plt.axvline(x=-VaR, linestyle='--', color='r', label='VaR at {}% Confidence'.format(int(cl * 100)))
plt.xlabel(f'{days}-Day Portfolio Return ($)')
plt.ylabel('Frequency')
plt.title(f'Distribution of Portfolio {days}-Day Returns and Parametric VaR Estimates')
plt.legend()
plt.show()

历史模拟法

# %% [markdown]
# 设置观察时间以及测试标的

import numpy as np
import yfinance as yf
import pandas as pd
import datetime as dt
import matplotlib.pyplot as plt
from scipy.stats import norm
# data = yf.download("JPM", start="2023-10-02", end="2023-10-31")
years = 15
endDate = dt.datetime.now()
startDate = endDate - dt.timedelta(days = 365*years)
tickers = ['SPY', 'BND', 'GLD', 'QQQ', 'VTI']

# 下载数据并取对数
adj_close_df = pd.DataFrame()
for ticker in tickers:
data = yf.download(ticker, start=startDate, end=endDate)
adj_close_df[ticker] = data['Adj Close']
log_returns = np.log(adj_close_df / adj_close_df.shift(1))
log_returns = log_returns.dropna()

# 设置权重及组合收益周期并计算组合收益
portfolio_value = 1000000
weights = np.array([1/len(tickers)] * len(tickers))
historical_returns = (log_returns * weights).sum(axis=1)
days = 50
range_returns = historical_returns.rolling(window = days).sum()
range_returns = range_returns.dropna()
print(range_returns)

# 计算VaR并绘制分布表
confidence_interval = 0.99
VaR = -np.percentile(range_returns, 100 - (confidence_interval * 100))*portfolio_value
print(VaR)

return_window = days
range_returns = historical_returns.rolling(window=return_window).sum()
range_returns = range_returns.dropna()
range_returns_dollar = range_returns * portfolio_value

plt.hist(range_returns_dollar.dropna(), bins=50, density=True)
plt.xlabel(f'{return_window}-Day Portfolio Return (Dollar Value)')
plt.ylabel('Frequency')
plt.title(f'Distribution of Portfolio {return_window}-Day Returns (Dollar Value)')
plt.axvline(-VaR, color='r', linestyle='dashed', linewidth=2, label=f'VaR at {confidence_interval:.0%} confidence level')
plt.legend()
plt.show()

蒙特卡洛模拟法

# 设置观察时间以及测试标的
import numpy as np
import yfinance as yf
import pandas as pd
import datetime as dt
import matplotlib.pyplot as plt
from scipy.stats import norm
# data = yf.download("JPM", start="2023-10-02", end="2023-10-31")
years = 15
endDate = dt.datetime.now()
startDate = endDate - dt.timedelta(days = 365*years)
tickers = ['SPY', 'BND', 'GLD', 'QQQ', 'VTI']

# 下载数据并取对数
adj_close_df = pd.DataFrame()
for ticker in tickers:
data = yf.download(ticker, start=startDate, end=endDate)
adj_close_df[ticker] = data['Adj Close']
print(adj_close_df)
log_returns = np.log(adj_close_df / adj_close_df.shift(1))
log_returns = log_returns.dropna()

# 建立组合回报和组合标准差方程
def expected_return(weights, log_returns):
return np.sum(log_returns.mean()*weights)
def standard_deviation (weights, cov_matrix):
variance = weights.T @ cov_matrix @ weights
return np.sqrt(variance)

# 建立协方差矩阵,计算组合回报和组合标准差,设置权重
cov_matrix = log_returns.cov()
print(cov_matrix)

portfolio_value = 1000000
weights = np.array([1/len(tickers)]*len(tickers))
portfolio_expected_return = expected_return(weights, log_returns)
portfolio_std_dev = standard_deviation (weights, cov_matrix)

# 建立蒙特卡洛(Monte Carlo Simulation)方程并执行
def random_z_score():
return np.random.normal(0, 1)

# 建立方程计算模拟组合收益
days = 20
def scenario_gain_loss(portfolio_value, portfolio_std_dev, z_score, days):
return portfolio_value * portfolio_expected_return * days + portfolio_value * portfolio_std_dev * z_score * np.sqrt(days)

# 重复10000次模拟
simulations = 10000
scenarioReturn = []

for i in range(simulations):
z_score = random_z_score()
scenarioReturn.append(scenario_gain_loss(portfolio_value, portfolio_std_dev, z_score, days))

# 设置置信区间,计算VaR并绘制分布图
confidence_interval = 0.99
VaR = -np.percentile(scenarioReturn, 100 * (1 - confidence_interval))
print(VaR)

# 绘制10000次模拟得到的VaR的分布图
plt.hist(scenarioReturn, bins=50, density=True)
plt.xlabel('Scenario Gain/Loss ($)')
plt.ylabel('Frequency')
plt.title(f'Distribution of Portfolio Gain/Loss Over {days} Days')
plt.axvline(-VaR, color='r', linestyle='dashed', linewidth=2, label=f'VaR at {confidence_interval:.0%} confidence level')
plt.legend()
plt.show()