每天下载及更新K线资料,实际上做起来不比之前的程序简单,因为还要考虑已存在DB中的资料不抓,以免资料重覆导致计算错误。另外要说明,这里INSERT资料到资料库的动作,一样是用pandas的to_sql,因为之前在建立Table时已经有产生index,所以在这里必须传入index=False这个参数,如果没有这个参数,会因为DB中已经存在index而产生错误。
首先说明每天下载1分K资料并储存至DB中,请注意此动作请在收盘後再做,因为在盘中是抓得到当天的kbar资料,下列的程序并没有考虑收盘前抓资料的情况。
程序范例如下:
import os, datetime
import shioaji as sj
from shioaji.constant import Exchange
from dotenv import load_dotenv
import pandas as pd
import re #汇入re模组
import sqlite3
update_date = None
load_dotenv('D:/python/shioaji/.env')
conn = sqlite3.connect('C:/shioaji.db')
api = sj.Shioaji()
api.login(
person_id=os.getenv('YOUR_PERSON_ID'),
passwd=os.getenv('YOUR_PASSWORD')
)
print(api.Contracts.Stocks) #等待Contract.Stocks Fetch完成
# 从Contracts中,抓到目前最新的update_date日期
def get_update_date():
global update_date
for exchange in api.Contracts.Stocks:
for stock in exchange:
if stock.exchange in (Exchange.TSE, Exchange.OTC):
if stock.category == '00' or stock.category == '' or stock.update_date == '':
pass
else:
if update_date is None:
update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
elif datetime.datetime.strptime(stock.update_date, '%Y/%m/%d') > update_date:
update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
#传入股票代码,抓kbar资料并存至DB中
def update_stock_kbar(stock_code):
#预设start_date为2018-12-07
start_date = datetime.datetime.strptime('2018-12-07', '%Y-%m-%d')
cursor = conn.cursor()
# 先从1分K的Table中,抓该档股票最後的ts
cursor.execute('SELECT ts FROM stocks_1min_kbars WHERE Code = ? ORDER BY ts DESC LIMIT 1', (stock_code, ))
query_result = cursor.fetchone()
if query_result is not None:
start_date = datetime.datetime.strptime(query_result[0], '%Y-%m-%d %H:%M:%S')
start_date = start_date.date()+datetime.timedelta(days=1)
kbars = api.kbars(stock, start=start_date.strftime("%Y-%m-%d"),
end=update_date.strftime("%Y-%m-%d"))
df = pd.DataFrame({**kbars})
df.ts = pd.to_datetime(df.ts)
df['Code'] = stock.code
df.to_sql('stocks_1min_kbars', conn, if_exists='append', index=False)
get_update_date() #先执行一次get_update_date,取得最近一次的update_date
for exchange in api.Contracts.Stocks:
for stock in exchange:
if stock.exchange in (Exchange.TSE, Exchange.OTC):
if stock.category == '00' or stock.category == '':
pass
elif stock.update_date != update_date.strftime("%Y/%m/%d"):
pass
elif re.search('[A-Z]', stock.code) is None:
print(f'start download {stock.code} {stock.name} kbar data...')
update_stock_kbar(stock_code=stock.code)
print(f'{stock.code} {stock.name} kbar data is stored to sqlite')
api.logout()
抓完1分K,接着是就从1分K的资料产生出日K资料。这里要注意的是在读取1分K资料时,有先将dataFrame的index设为ts栏位,这样才可以把资料resample为日K资料,所以在执行to_sql前,要再记得执行reset_index重设index,这样to_sql後ts栏位资料才会正常写入资料库中。
程序范例如下:
import pandas as pd
import sqlite3
import datetime
conn = sqlite3.connect('C:/shioaji.db')
cursor = conn.cursor() #产生cursor物件
# 传入股票代码,产生日K资料并存至资料库
def update_day_kbar(stock_code):
print(f'update day kbars for stock:{stock_code}')
start_date = datetime.datetime.strptime('2018-12-07', '%Y-%m-%d') #预设start_date为2018-12-07
# 从day_kbars中,找该档股票最後的ts
cursor.execute('SELECT ts FROM stocks_day_kbars WHERE Code = ? ORDER BY ts DESC LIMIT 1', (stock_code, ))
query_result = cursor.fetchone()
if query_result is not None:
start_date = datetime.datetime.strptime(query_result[0], '%Y-%m-%d %H:%M:%S')
start_date = start_date.date()+datetime.timedelta(days=1)
df = pd.read_sql('''SELECT Code, Open, High, Low, Close, Volume, ts
FROM stocks_1min_kbars WHERE Code = ? AND ts > ?''',
conn, parse_dates=['ts'], index_col=['ts'], params=(stock_code, start_date))
df_day_kbar = df.resample(rule='1D').agg({
'Code': 'first',
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'})
df_day_kbar.dropna(axis=0, inplace=True)
df_day_kbar.reset_index(inplace=True) #重设index
df_day_kbar.to_sql('stocks_day_kbars', conn, if_exists='append', index=False)
print(f'stock:{stock_code}, day kbars is update to DB...')
codes = cursor.execute('SELECT DISTINCT code FROM stocks_1min_kbars').fetchall()
for code in codes:
update_day_kbar(code[0])
conn.close() #关闭资料库连线
产生完日K资料後,要再重新计算一次均线相关资料。原本是想要写在上面的产生日K资料的程序里,但发现程序会变得太复杂,所以索性把这部份拆成另一个程序。
程序范例如下:
import pandas as pd
import sqlite3
from statistics import mean
conn = sqlite3.connect('C:/shioaji.db') #建立资料库连线
cursor = conn.cursor() #产生cursor物件
# 更新均线相关资料
def update_MA_data(stock_code):
cursor.execute('''SELECT ts FROM stocks_day_kbars
WHERE Code = ? AND MA5 IS NOT NULL
ORDER BY ts DESC LIMIT 1''', (stock_code, ))
query_results = cursor.fetchone()
if query_results is not None:
start_ts = query_results[0]
# 计算MA5
cursor.execute('''SELECT ts FROM stocks_day_kbars
WHERE Code = ? AND ts > ? AND MA5 IS NULL''', (stock_code, start_ts))
for data in cursor.fetchall():
cursor.execute('''SELECT ts, Close, MA5
FROM stocks_day_kbars
WHERE Code = ? AND ts <= ?
ORDER BY ts DESC LIMIT 5''', (stock_code, data[0]))
close_data = [] #收盘价list
last_ma5 = None
for ma_data in cursor.fetchall():
close_data.append(ma_data[1])
if last_ma5 is None:
last_ma5 = ma_data[2]
ma5 = round(mean(close_data), 2) #用statistics.mean计算均价,并取至小数点後第二位
ma5_diff = round(ma5-last_ma5, 2) #计算MA5的差异数,并取至小数点後第二位
cursor.execute('''UPDATE stocks_day_kbars SET MA5 = ?, MA5_diff = ?
WHERE Code = ? AND ts = ?''', (ma5, ma5_diff, stock_code, data[0]))
conn.commit() #执行上面的update sql
#计算MA20
cursor.execute('''SELECT ts FROM stocks_day_kbars
WHERE Code = ? AND ts > ? AND MA20 IS NULL''', (stock_code, start_ts))
for data in cursor.fetchall():
cursor.execute('''SELECT ts, Close, MA20 FROM stocks_day_kbars
WHERE Code = ? AND ts <= ? ORDER BY ts DESC LIMIT 20''', (stock_code, data[0]))
close_data = []
last_ma20 = None
for ma_data in cursor.fetchall():
close_data.append(ma_data[1])
if last_ma20 is None:
last_ma20 = ma_data[2]
ma20 = round(mean(close_data), 2)
ma20_diff = round(ma20-last_ma20, 2)
cursor.execute('''UPDATE stocks_day_kbars SET MA20 = ?, MA20_diff = ?
WHERE Code = ? AND ts = ?''', (ma20, ma20_diff, stock_code, data[0]))
conn.commit() #执行上面的update sql
codes = cursor.execute('SELECT DISTINCT code FROM stocks_1min_kbars').fetchall()
for code in codes:
print(f'update {code[0]} MA data...')
update_MA_data(code[0])
conn.close() #关闭资料库连线
最後,就是从日K资料中,产生观察名单。第一步我会先用DB Browser for SQLite去撰写SQL并看取出来的资料是不是我要的。撰写完後,按下上方的执行按钮,若SQL语法没有问题,下方就会跑出结果。
有了初步的资料後,再使用python让产生出来的资料更加完整。程序范例如下:
import os, datetime
import shioaji as sj
from shioaji.constant import Exchange
from dotenv import load_dotenv
import pandas as pd
import sqlite3
update_date = None
load_dotenv('D:/python/shioaji/.env')
conn = sqlite3.connect('C:/shioaji.db') #在D槽底下建立资料库
api = sj.Shioaji()
api.login(
person_id=os.getenv('YOUR_PERSON_ID'),
passwd=os.getenv('YOUR_PASSWORD')
)
print(api.Contracts.Stocks) #等待Contract.Stocks Fetch完成
def get_update_date():
global update_date
for exchange in api.Contracts.Stocks:
for stock in exchange:
if stock.exchange in (Exchange.TSE, Exchange.OTC):
if stock.category == '00' or stock.category == '' or stock.update_date == '':
pass
else:
if update_date is None:
update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
elif datetime.datetime.strptime(stock.update_date, '%Y/%m/%d') > update_date:
update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
get_update_date()
# 依照我们所要的条件,使用read_sql取出查询结果
df = pd.read_sql('''SELECT ts, Code, Open, High, Low, Close, Volume,
MA5, MA20, MA5_diff, MA20_diff FROM stocks_day_kbars
WHERE ts >= ? AND MA5 > MA20 AND Close >= MA20 AND MA20_diff > 0''',
conn, parse_dates=['ts'], params=(update_date, ))
stock_name = [] #建立股票名称list
# 历遍df,透过shioaji取得股票名称并加入list中
for index, row in df.iterrows():
stock_name.append(api.Contracts.Stocks[row['Code']].name)
df['Name'] = stock_name #增加Name栏位,将栏位值设为stock_name
df = df[['ts', 'Code', 'Name', 'Open', 'High', 'Low', 'Close', 'Volume', 'MA5', 'MA20', 'MA5_diff', 'MA20_diff']] #设定栏位顺序
df.to_excel('recommand.xlsx', encoding="utf_8_sig", index=False) #将结果汇出成Excel
api.logout()
之前介绍DataFrame汇出档案时,都只用to_csv汇出成CSV档,前几天发现其实可以直接用to_excel汇出成xlsx档,不过这个功能会用到openpyxl套件,若没有先安装会导致汇出失败,若要使用to_excel,可以先执行pip install openpyxl
进行安装。
上面这个筛选条件,是用来产生多头排列的股票清单,这里只有用MA5跟MA20做判断,并且只选出MA20_diff这个栏位值大於0且当日收盘价大於等於MA20的股票清单,并且透过shioaji抓股票的名称,最後产生的资料如下:
shioaji在报价及下单这方面功能算是齐全,但还是有一些资料是要使用其它套件来计算及产生。原本想说30天铁人赛有点困难,但实际写完後才对Shioaji有更深入的了解,当然还有好大一部份需要花时间去研究(例如盘中交易策略及即时K线资料产生)。
而目前shioaji还没有盘後资料,这部份只能再找看看是否有其它的套件可以提供,或是直接抓证交所的资料。结论就是要做到真正的程序交易,还有很长的一段路要走,身为初学者的我们,可以先从盘後的资料下手,再一步步进展到盘中的资料。
<<: [从0到1] C#小乳牛 练成基础程序逻辑 Day 30 - 刷题日常 九大练功房 剑指Leetcode
续 Day 12 今天的特别理论和抽象,所以懒得看就跳过吧! 系统模型和现实 (System Mo...
题号:7 标题;Reverse Integer 难度:Medium Given a signed 3...
在系列文章的一开始,我们花了些篇幅介绍一些结构面的轮廓,现在开始要带大家开始进行实作了! 环境需求 ...
接下来这一个章节,焦点还是会在filebeat上,通常在收集log,并不是所有资料都需要收集到Ela...
我们来看到C#要如何计算程序码的执行时间呢 ~ 有两种方法分别是 Stopwatch DateTim...