Day 30 - 每日产生观察名单

本篇重点

  • 每日下载及更新Kbar资料
  • 每日产生观察名单
  • 结论

每日下载及更新Kbar资料

每天下载及更新K线资料,实际上做起来不比之前的程序简单,因为还要考虑已存在DB中的资料不抓,以免资料重覆导致计算错误。另外要说明,这里INSERT资料到资料库的动作,一样是用pandas的to_sql,因为之前在建立Table时已经有产生index,所以在这里必须传入index=False这个参数,如果没有这个参数,会因为DB中已经存在index而产生错误。

下载及更新1分K资料

首先说明每天下载1分K资料并储存至DB中,请注意此动作请在收盘後再做,因为在盘中是抓得到当天的kbar资料,下列的程序并没有考虑收盘前抓资料的情况。
程序范例如下:

import os, datetime
import shioaji as sj
from shioaji.constant import Exchange
from dotenv import load_dotenv
import pandas as pd
import re #汇入re模组
import sqlite3

update_date = None

load_dotenv('D:/python/shioaji/.env')
conn = sqlite3.connect('C:/shioaji.db')

api = sj.Shioaji()
api.login(
    person_id=os.getenv('YOUR_PERSON_ID'), 
    passwd=os.getenv('YOUR_PASSWORD')
)
print(api.Contracts.Stocks) #等待Contract.Stocks Fetch完成
# 从Contracts中,抓到目前最新的update_date日期
def get_update_date():
    global update_date
    for exchange in api.Contracts.Stocks:
        for stock in exchange:
            if stock.exchange in (Exchange.TSE, Exchange.OTC):   
                if stock.category == '00' or stock.category == '' or stock.update_date == '':
                    pass
                else:
                    if update_date is None:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
                    elif datetime.datetime.strptime(stock.update_date, '%Y/%m/%d') > update_date:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')

#传入股票代码,抓kbar资料并存至DB中
def update_stock_kbar(stock_code):
    #预设start_date为2018-12-07
    start_date = datetime.datetime.strptime('2018-12-07', '%Y-%m-%d')
    cursor = conn.cursor()
    # 先从1分K的Table中,抓该档股票最後的ts
    cursor.execute('SELECT ts FROM stocks_1min_kbars WHERE Code = ? ORDER BY ts DESC LIMIT 1', (stock_code, ))
    query_result = cursor.fetchone()
    if query_result is not None:
        start_date = datetime.datetime.strptime(query_result[0], '%Y-%m-%d %H:%M:%S')
        start_date = start_date.date()+datetime.timedelta(days=1)
    kbars = api.kbars(stock, start=start_date.strftime("%Y-%m-%d"), 
        end=update_date.strftime("%Y-%m-%d"))
    df = pd.DataFrame({**kbars})
    df.ts = pd.to_datetime(df.ts)
    df['Code'] = stock.code
    df.to_sql('stocks_1min_kbars', conn, if_exists='append', index=False)

get_update_date() #先执行一次get_update_date,取得最近一次的update_date

for exchange in api.Contracts.Stocks:
    for stock in exchange:
        if stock.exchange in (Exchange.TSE, Exchange.OTC):   
            if stock.category == '00' or stock.category == '':
                pass
            elif stock.update_date != update_date.strftime("%Y/%m/%d"):
                pass
            elif re.search('[A-Z]', stock.code) is None:
                print(f'start download {stock.code} {stock.name} kbar data...')
                update_stock_kbar(stock_code=stock.code)
                print(f'{stock.code} {stock.name} kbar data is stored to sqlite')

api.logout()

更新日K资料

抓完1分K,接着是就从1分K的资料产生出日K资料。这里要注意的是在读取1分K资料时,有先将dataFrame的index设为ts栏位,这样才可以把资料resample为日K资料,所以在执行to_sql前,要再记得执行reset_index重设index,这样to_sql後ts栏位资料才会正常写入资料库中。
程序范例如下:

import pandas as pd
import sqlite3
import datetime

conn = sqlite3.connect('C:/shioaji.db')
cursor = conn.cursor() #产生cursor物件

# 传入股票代码,产生日K资料并存至资料库
def update_day_kbar(stock_code):
    print(f'update day kbars for stock:{stock_code}')
    start_date = datetime.datetime.strptime('2018-12-07', '%Y-%m-%d') #预设start_date为2018-12-07
    # 从day_kbars中,找该档股票最後的ts
    cursor.execute('SELECT ts FROM stocks_day_kbars WHERE Code = ? ORDER BY ts DESC LIMIT 1', (stock_code, ))
    query_result = cursor.fetchone()
    if query_result is not None:
        start_date = datetime.datetime.strptime(query_result[0], '%Y-%m-%d %H:%M:%S')
        start_date = start_date.date()+datetime.timedelta(days=1)
    df = pd.read_sql('''SELECT Code, Open, High, Low, Close, Volume, ts 
                FROM stocks_1min_kbars WHERE Code = ? AND ts > ?''', 
                conn, parse_dates=['ts'], index_col=['ts'], params=(stock_code, start_date)) 
    df_day_kbar = df.resample(rule='1D').agg({
        'Code': 'first',
        'Open': 'first', 
        'High': 'max', 
        'Low': 'min', 
        'Close': 'last',
        'Volume': 'sum'})
    df_day_kbar.dropna(axis=0, inplace=True)
    df_day_kbar.reset_index(inplace=True) #重设index
    df_day_kbar.to_sql('stocks_day_kbars', conn, if_exists='append', index=False)
    print(f'stock:{stock_code}, day kbars is update to DB...')

codes = cursor.execute('SELECT DISTINCT code FROM stocks_1min_kbars').fetchall()
for code in codes:
    update_day_kbar(code[0])

conn.close() #关闭资料库连线

重新计算均线相关资料

产生完日K资料後,要再重新计算一次均线相关资料。原本是想要写在上面的产生日K资料的程序里,但发现程序会变得太复杂,所以索性把这部份拆成另一个程序。
程序范例如下:

import pandas as pd
import sqlite3
from statistics import mean

conn = sqlite3.connect('C:/shioaji.db') #建立资料库连线
cursor = conn.cursor() #产生cursor物件

# 更新均线相关资料
def update_MA_data(stock_code):
    cursor.execute('''SELECT ts FROM stocks_day_kbars 
                    WHERE Code = ? AND MA5 IS NOT NULL 
                    ORDER BY ts DESC LIMIT 1''', (stock_code, ))
    query_results = cursor.fetchone()
    if query_results is not None:
        start_ts = query_results[0]
        # 计算MA5
        cursor.execute('''SELECT ts FROM stocks_day_kbars 
                        WHERE Code = ? AND ts > ? AND MA5 IS NULL''', (stock_code, start_ts))
        for data in cursor.fetchall():
            cursor.execute('''SELECT ts, Close, MA5 
                            FROM stocks_day_kbars 
                            WHERE Code = ? AND ts <= ? 
                            ORDER BY ts DESC LIMIT 5''', (stock_code, data[0]))
            close_data = [] #收盘价list
            last_ma5 = None
            for ma_data in cursor.fetchall():
                close_data.append(ma_data[1])
                if last_ma5 is None:
                    last_ma5 = ma_data[2]
            ma5 = round(mean(close_data), 2) #用statistics.mean计算均价,并取至小数点後第二位
            ma5_diff = round(ma5-last_ma5, 2) #计算MA5的差异数,并取至小数点後第二位
            cursor.execute('''UPDATE stocks_day_kbars SET MA5 = ?, MA5_diff = ? 
                            WHERE Code = ? AND ts = ?''', (ma5, ma5_diff, stock_code, data[0]))
            conn.commit() #执行上面的update sql
        #计算MA20
        cursor.execute('''SELECT ts FROM stocks_day_kbars
                        WHERE Code = ? AND ts > ? AND MA20 IS NULL''', (stock_code, start_ts))
        for data in cursor.fetchall():
            cursor.execute('''SELECT ts, Close, MA20 FROM stocks_day_kbars 
                WHERE Code = ? AND ts <= ? ORDER BY ts DESC LIMIT 20''', (stock_code, data[0]))
            close_data = []
            last_ma20 = None
            for ma_data in cursor.fetchall():
                close_data.append(ma_data[1])
                if last_ma20 is None:
                    last_ma20 = ma_data[2]
            ma20 = round(mean(close_data), 2)
            ma20_diff = round(ma20-last_ma20, 2)
            cursor.execute('''UPDATE stocks_day_kbars SET MA20 = ?, MA20_diff = ? 
                        WHERE Code = ? AND ts = ?''', (ma20, ma20_diff, stock_code, data[0]))
            conn.commit() #执行上面的update sql

codes = cursor.execute('SELECT DISTINCT code FROM stocks_1min_kbars').fetchall()
for code in codes:
    print(f'update {code[0]} MA data...')
    update_MA_data(code[0])

conn.close() #关闭资料库连线

每日产生观察名单

最後,就是从日K资料中,产生观察名单。第一步我会先用DB Browser for SQLite去撰写SQL并看取出来的资料是不是我要的。撰写完後,按下上方的执行按钮,若SQL语法没有问题,下方就会跑出结果。
https://ithelp.ithome.com.tw/upload/images/20211014/20140827IdZAuxykWf.png
有了初步的资料後,再使用python让产生出来的资料更加完整。程序范例如下:

import os, datetime
import shioaji as sj
from shioaji.constant import Exchange
from dotenv import load_dotenv
import pandas as pd
import sqlite3

update_date = None

load_dotenv('D:/python/shioaji/.env')
conn = sqlite3.connect('C:/shioaji.db') #在D槽底下建立资料库

api = sj.Shioaji()
api.login(
    person_id=os.getenv('YOUR_PERSON_ID'), 
    passwd=os.getenv('YOUR_PASSWORD')
)
print(api.Contracts.Stocks) #等待Contract.Stocks Fetch完成

def get_update_date():
    global update_date
    for exchange in api.Contracts.Stocks:
        for stock in exchange:
            if stock.exchange in (Exchange.TSE, Exchange.OTC):   
                if stock.category == '00' or stock.category == '' or stock.update_date == '':
                    pass
                else:
                    if update_date is None:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
                    elif datetime.datetime.strptime(stock.update_date, '%Y/%m/%d') > update_date:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')

get_update_date()
# 依照我们所要的条件,使用read_sql取出查询结果
df = pd.read_sql('''SELECT ts, Code, Open, High, Low, Close, Volume, 
                    MA5, MA20, MA5_diff, MA20_diff FROM stocks_day_kbars 
                    WHERE ts >= ? AND MA5 > MA20 AND Close >= MA20 AND MA20_diff > 0''',
                    conn, parse_dates=['ts'], params=(update_date, ))
stock_name = [] #建立股票名称list
# 历遍df,透过shioaji取得股票名称并加入list中
for index, row in df.iterrows():
    stock_name.append(api.Contracts.Stocks[row['Code']].name)
df['Name'] = stock_name #增加Name栏位,将栏位值设为stock_name
df = df[['ts', 'Code', 'Name', 'Open', 'High', 'Low', 'Close', 'Volume', 'MA5', 'MA20', 'MA5_diff', 'MA20_diff']] #设定栏位顺序
df.to_excel('recommand.xlsx', encoding="utf_8_sig", index=False) #将结果汇出成Excel

api.logout()

之前介绍DataFrame汇出档案时,都只用to_csv汇出成CSV档,前几天发现其实可以直接用to_excel汇出成xlsx档,不过这个功能会用到openpyxl套件,若没有先安装会导致汇出失败,若要使用to_excel,可以先执行pip install openpyxl进行安装。
上面这个筛选条件,是用来产生多头排列的股票清单,这里只有用MA5跟MA20做判断,并且只选出MA20_diff这个栏位值大於0且当日收盘价大於等於MA20的股票清单,并且透过shioaji抓股票的名称,最後产生的资料如下:
https://ithelp.ithome.com.tw/upload/images/20211015/20140827mL0JwAirfw.png

结论

shioaji在报价及下单这方面功能算是齐全,但还是有一些资料是要使用其它套件来计算及产生。原本想说30天铁人赛有点困难,但实际写完後才对Shioaji有更深入的了解,当然还有好大一部份需要花时间去研究(例如盘中交易策略及即时K线资料产生)。
而目前shioaji还没有盘後资料,这部份只能再找看看是否有其它的套件可以提供,或是直接抓证交所的资料。结论就是要做到真正的程序交易,还有很长的一段路要走,身为初学者的我们,可以先从盘後的资料下手,再一步步进展到盘中的资料。


<<:  [从0到1] C#小乳牛 练成基础程序逻辑 Day 30 - 刷题日常 九大练功房 剑指Leetcode

>>:  Wrapping up

Trouble with Distributed Systems (4-2) - System Model & Summary

续 Day 12 今天的特别理论和抽象,所以懒得看就跳过吧! 系统模型和现实 (System Mo...

找LeetCode上简单的题目来撑过30天啦(DAY26)

题号:7 标题;Reverse Integer 难度:Medium Given a signed 3...

[Day6] - Django 起手式

在系列文章的一开始,我们花了些篇幅介绍一些结构面的轮廓,现在开始要带大家开始进行实作了! 环境需求 ...

Day05 Filebeat(三) 正则表达式

接下来这一个章节,焦点还是会在filebeat上,通常在收集log,并不是所有资料都需要收集到Ela...

【C#】计算程序的执行时间

我们来看到C#要如何计算程序码的执行时间呢 ~ 有两种方法分别是 Stopwatch DateTim...