资料取得 - 多重来源

说是多重来源,其实也就是本机和 shioaji 而已,我的想法是这样子的,如果本机有资料的话,就从本机抓取,如果本机没有的话就连线到 shioaji 抓取,如果是要跑半年线的资料的话,至少可以省下半年的时间,因为我的目标,可能一天要跑 20 支以上的股票,那这样子省下来的时间就多了。以下是大概的架构。

取得资料流程

规画的目录架构如下:

project
│   main.py
│   service.py
│
└───repository
│   │   databse.py
│   │   models.py
│   │   database.sqlite3 (资料库档案)
│   │   api.py (shioaji api)
  • api.py 这部份之前已经使用很多次了,所以就不介绍了,程序码如下
import shioaji as sj
import pandas as pd
from datetime import datetime

PERSON_ID = "身份证字号"
PASSWD = "密码"

api = sj.Shioaji()

def getKbarsFromApi(stock_code, start, end):
    api.login(person_id=PERSON_ID, passwd=PASSWD)

    stock = api.Contracts.Stocks[stock_code]
    kbars = api.kbars(stock ,start=start, end=end)
    
    df = __kbarsConvertToDf(kbars)
    
    api.logout()

    return df


def __kbarsConvertToDf(kbars):
    dts = list(map(lambda x:datetime.utcfromtimestamp(x / 10**9), kbars.ts))

    df = pd.DataFrame(
        {
            "open": pd.Series(kbars.Open),
            "high": pd.Series(kbars.High),
            "low": pd.Series(kbars.Low),
            "close": pd.Series(kbars.Close),
            "volume": pd.Series(kbars.Volume),
        }
    )
    
    df.index = pd.Index(dts)
    
    return df
  • database.py 昨天也示范过了,一样直接上程序码
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///repository/database.sqlite3"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)

session = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
  • models.py 昨天也有示范过,不过这边加了一个 Offdays 的资料,是要纪录休市的日期的,这样子本机捞不到资料,可以判断是不是休市,如果是的话,也不用去 Shioaji 再捞一次了
from sqlalchemy import Column, String, DateTime, Integer
from sqlalchemy.sql.sqltypes import Date, Float
from .database import Base


class Kbars(Base):
    __tablename__ = 'kbars'

    stock_code = Column(String, primary_key=True)
    datetime = Column(DateTime(timezone=False), primary_key=True)
    open = Column(Float(precision=2))
    high = Column(Float(precision=2))
    low = Column(Float(precision=2))
    close = Column(Float(precision=2))
    volume = Column(Integer)

    def __init__(self, stock_code, datetime, open, high, low, close, volume):
        self.stock_code = stock_code
        self.datetime = datetime
        self.open = open
        self.high = high
        self.low = low
        self.close = close
        self.volume = volume

    def __repr__(self):
        return "Stock: {} datetime: {} open:{}|high:{}|low:{}|close:{}|vol:{}".format(
            self.stock_code, self.datetime, self.open, self.high, self.low, self.close, self.volume)


class OffDays(Base):
    __tablename__ = "offDays"

    id = Column(Integer, primary_key=True)
    date = Column(Date)
    
    def __init__(self, date):
        self.date = date

    def __repr__(self):
        return self.date.strftime("%Y-%m-%d")
  • services.py 主要资料取得的逻辑在这个档案,最有可能出错的也是在这里,所以各位参考就好,如果有问题还是要自己处理喔
from datetime import timedelta, datetime
from sqlalchemy.sql.expression import and_
from repository.api import getKbarsFromApi
from repository.models import Kbars, OffDays
from repository.database import session
from sqlalchemy import func
import pandas as pd


class RangeClass:
      '''纪录要捞取时间范围的class'''
    def __init__(self):
        self.start = None
        self.end = None
        
    def __repr__(self):
        return "{{start: {}, end: {}}}".format(
            self.start.strftime("%Y-%m-%d"), 
            "None" if self.end is None else self.end.strftime("%Y-%m-%d"))

def getKbars(stock_code, start, end):
    # 建立 db 连线
    db = session()
    
    # 把 db 里有资料的日期取出来
    dbDates = db.query(func.date(Kbars.datetime))\
        .filter(and_(func.date(Kbars.datetime) >= start, 
                     func.date(Kbars.datetime) <= end, 
                     Kbars.stock_code == stock_code)).distinct().all()

    # 将日期的 string 转成 date 并存成 list
    dbDates = [datetime.strptime(value, "%Y-%m-%d").date() for value, in dbDates]
    
    # 把资料库里的 offDays 资料取出
    offDays = db.query(OffDays.date).filter(and_(OffDays.date >= start, OffDays.date <= end)).all()
    
    # 存成 list
    offDays = [value for value, in offDays]

    # 建立日期的 list
    dateRange = pd.date_range(start = start, end = end).to_pydatetime().tolist()
    
    # 要从 shioaji 取得的 RangeClass tuple
    apiRange = ()
    # db 有资料的 RangeClass tuple (可以不用)
    dbRange = ()

    # api 的 RangeClass
    apiRangeObject = RangeClass()
    # db 的 RangeClass
    dbRangeObject = RangeClass()

    # 检查日期列表,是否在 db 中
    for date in dateRange:

        if date.date() in dbDates:
        # 在 db 里
            if dbRangeObject.start is None:
                # 如果 db 的 RangeClass 开始是空的,就设定开始日期
                # 因为日期是一个区间,所以只要设一次就可了
                dbRangeObject.start = date.date()

                if apiRangeObject.start is not None:
                    # 如果 api 的 RangeClass 开始不是空的, 就设定结束日期, 并放入 apiRange tuple 中
                    apiRangeObject.end = date.date() - timedelta(days = 1)
                    apiRange += (apiRangeObject,)

                    # 重置 api 的 RangeClass
                    apiRangeObject = RangeClass()
        
        else:
        # 不在 db 里,要从 shioaji 取得
            if apiRangeObject.start is None:
                # api RangeClass 开始是空的,设定开始日期
                if date.date() not in offDays:
                    # 判断当天是不是休市,不是休市才进行设定
                    apiRangeObject.start = date.date()

                    if dbRangeObject.start is not None:
                        # 如果 db 的 RangeClass 有开始资料,就设定结束资料,并放入 dbRange tuple 中
                        dbRangeObject.end = date.date() - timedelta(days = 1)
                        dbRange += (dbRangeObject,)
                        
                        # 重置 db 的 RangeClass
                        dbRangeObject = RangeClass()


    # 结束後,检查 db, api 的 RangeClass, 
    # 如果有还没有结束的,就把它结束掉,并放入 tuple 中
    if dbRangeObject.start is not None:
        dbRangeObject.end = dateRange[-1].date()
        dbRange += (dbRangeObject,)
                    
    if apiRangeObject.start is not None:
        apiRangeObject.end = dateRange[-1].date()
        apiRange += (apiRangeObject,)

    # api 的 tuple 有资料
    if len(apiRange) > 0:
        for range in apiRange:
            # 呼叫 shioaji 进行抓取
            kbars = getKbarsFromApi(stock_code, range.start.strftime('%Y-%m-%d'), range.end.strftime('%Y-%m-%d')) 
            
            # 没有资料,把日期存入 offDays
            if len(kbars.index) == 0:
                db.add(OffDays(range.start))
                nextDay = range.start
                while nextDay < range.end:
                    nextDay += timedelta(days=1)
                    db.add(OffDays(nextDay))
            else:
            # 有资料,把资料存到 db
                for index, row in kbars.iterrows():
                    kbar = Kbars(
                        stock_code, index, row["open"], row["high"], row["low"], row["close"], row["volume"]
                    )
                    
                    db.add(kbar)
        

    db.commit()
    
    # 从 db 捞资料并转成 pd.dataframe
    statement = db.query(Kbars.datetime, Kbars.open, Kbars.high, Kbars.low, Kbars.close, Kbars.volume)\
        .filter(and_(func.date(Kbars.datetime) >= start, 
                     func.date(Kbars.datetime) <= end, 
                     Kbars.stock_code == stock_code)).statement
        
        
    df = pd.read_sql(statement, db.bind)
    
    df = df.set_index("datetime")
    
    db.close()

    return df
  • main.py 因为取资料的逻辑都在 services.py,所以 main.py 只要呼叫就可以了
from repository import models
from repository.database import engine
import services

# 这一个是初始化 db 的, 如果是第一次执行的话不能省
models.Base.metadata.create_all(bind=engine)

# 直接呼叫取得资料
kbars = services.getKbars("2412", "2021-10-01", "2021-10-11")

这样就可以取得我们的资料,之後我希望每天进行当日的资料分析,所以明天我们来实做一下如何进行排程


<<:  Day 27 - State Monad II

>>:  信件样式与内容编辑

Day 13. 是时候来规划个专案了,先来个小调查

我的小夥伴推荐我做贪吃蛇但吃的是我,於是就有了以下两种: 不是很完整的想法,希望大家可以从我的文字叙...

判断选取哪个radio button

var item = $('input:radio[name="radio_name&qu...

【程序】Onboarding process 转生成恶役菜鸟工程师避免 Bad End 的 30 件事 - 21

Onboarding process 第一周有哪些重点 每个月定期追踪 第三个月是最危险的 ...

Day3 条件判断

今天来学学Vue里面的判别式v-if 跟v-show 1.v - if 在这里我们将条件设定为Sho...

Day 27 - 上传档案 Carrierwave - 多个档案

昨天说的是单一档案上传,如果要多个档案上传的话... 建立新栏位 资料表先新增可以储存一个阵列的栏位...