Day 0x1B - odoo addons 永丰金流开发(Part 2 - sinopac sdk... maybe)

*** 模组资料夹 payment_sinopac 以 "/" 来代表此资料夹 ***

0x1 永丰金流的sdk...吧

说实在要改的还太多,但这里还是先放出来,
也没什麽时间修正了,铁人赛结束了还是会持续的修正 sdk

OK,那先建立 sdk 的 controller
在模组目录先建立 controller 资料夹
接着建立 __init__.py
/controller/__init__.py

# -*- coding: utf-8 -*-
from . import sinopac_sdk

然後建立 sinopac_sdk.py
这个基本上是参考 laravel 当时写的 sdk,
翻过来後也有测试是正常的
/controller/sinopac_sdk.py

import requests as req
import hashlib
import logging
from json import loads, dumps
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad

_logger = logging.getLogger(__name__)
href_origin = 'https://sandbox.sinopac.com/QPay.WebAPI'


def str2int(x):
    return int(x, 16)


class SinopacSDK:
    shop_no = False
    nonce = False
    key_a1 = False
    key_a2 = False
    key_b1 = False
    key_b2 = False

    def __init__(self, **params):
        """
        @params str shop_no: 商店代码
        @params str key_a1: 官方提供 hash key a1
        @params str key_a2: 官方提供 hash key a2
        @params str key_b1: 官方提供 hash key b1
        @params str key_b2: 官方提供 hash key b2
        """
        names = ['shop_no', 'key_a1', 'key_a2', 'key_b1', 'key_b2']
        for n in names:
            if not params.get(n):
                raise Exception(f'Not given {n}')
            else:
                self.__setattr__(n, params.get(n))

    def get_nonce(self) -> str:
        url = f'{href_origin}/api/Nonce'
        header = {
            'Content-type': 'application/json'
        }
        data = dumps({
            'ShopNo': self.shop_no
        })
        response = req.post(url=url, headers=header, data=data)

        if response.status_code >= 500:
            raise Exception('Server Error')
        elif response.status_code >= 400:
            raise Exception('Self Error')
        elif response.status_code != 200:
            raise Exception(f'Error Status: {response.status_code}')

        content = loads(response.content)
        if not content or not content.get('Nonce'):
            raise Exception('Get Nonce Failure')

        self.nonce = content.get('Nonce')
        return self.nonce

    def calculate_hash_id(self) -> str:
        key_length = len(self.key_a1)
        a = hex(str2int(self.key_a1) ^ str2int(self.key_a2))[2:]
        b = hex(str2int(self.key_b1) ^ str2int(self.key_b2))[2:]
        return f'{a:0>{key_length}}{b:0>{key_length}}'.upper()

    def sha256(self, string: str) -> str:
        return hashlib.sha256(string.encode('utf-8')).hexdigest().upper()

    def calculate_sign(self, data: dict, nonce: str, hash_id: str) -> str:
        uppercase_keys = sorted([k.upper() for k in data])
        new_dict = {}
        for key, value in data.items():
            if type(value) in [list, dict]:
                continue

            if value == '' or value is False:
                continue

            new_dict[uppercase_keys.index(key.upper())] = f'{key}={value}'

        message = '&'.join([new_dict[i] for i in sorted(new_dict)]) + nonce + hash_id
        return self.sha256(message)

    def calculate_iv(self, nonce: str) -> str:
        if not nonce:
            nonce = self.nonce or self.get_nonce()

        return self.sha256(nonce)[-16:]

    def aes_cipher(self, hash_id: str, iv: str):
        byte_key = bytes(hash_id.encode('utf8'))
        byte_iv = bytes(iv.encode('utf8'))
        return AES.new(key=byte_key, mode=AES.MODE_CBC, iv=byte_iv)

    def encrypt_message(self, data: dict, hash_id: str, iv: str):
        cipher = self.aes_cipher(hash_id, iv)

        json_data = dumps(data).encode('utf8')
        encrypt_message = cipher.encrypt(pad(json_data, AES.block_size))
        return encrypt_message.hex().upper()

    def decrypt_message(self, encrypt_message: str, hash_id: str, iv: str):
        cipher = self.aes_cipher(hash_id, iv)
        encrypt_message = bytes.fromhex(encrypt_message)
        message = unpad(cipher.decrypt(encrypt_message), AES.block_size)
        return loads(message)

    def call_api(self, url: str, data: dict):
        header = {
            'Content-type': 'application/json'
        }

        response = req.post(url=url, headers=header, data=dumps(data))
        data = loads(response.content)

        if 'Nonce' not in data:
            _logger.warning('Reply message haven\'t Nonce. JSON: %s', response.content.decode('utf8'))
            return {}

        hash_id = self.calculate_hash_id()
        dec_data = self.decrypt_message(
            encrypt_message=data['Message'],
            hash_id=hash_id,
            iv=self.calculate_iv(data['Nonce'])
        )

        if dec_data.get('Status', '') != 'S':
            _logger.warning('订单建立失败,原因: %s', dec_data.get('Description', ''))
            return {}

        sign = self.calculate_sign(
            data=dec_data,
            hash_id=hash_id,
            nonce=data['Nonce']
        )

        if sign != data['Sign']:
            _logger.error('验证错误,内文签章不同. JSON: %s', response.content)
            return {}

        return dec_data

0x2 今日结语

基本上依照之前踩过的坑都考虑进去了
但在 sign 那段卡了一阵子
莫名其妙 python 跟 php sha256 算出来的不一样
去上个厕所回来再按一次就正常了

真的很奇妙...
明天把主要Controller 新增 route 的部分丢上来
目前还卡在底层的流程厘清跟实现,希望可以顺利完成


<<:  近似最短路径 (3)

>>:  企业资料通讯 Week3

Day12 - Google Kubernetes Engine 基础 - Pod 建置

前言 前一天我们建立好了 Kubernetes 的环境,今天就来实际使用看看,将应用程序透过 Pod...

Day 17 AWS云端实作起手式第七弹 让开机器变得很自动自发Auto Scaling-ReadNode设置

关於Auto Scaling的建置,我们预计会花两到三天的时间来做比较详细说明。 参考Udemy A...

小白签到: 纪录python学习day 1

#背景: 有理工宅的身形但目前尚无无程序内涵的python初心者,本科系跟理工完全无关的非理工科系毕...

Flutter在Android模拟器无法安装release apk原因

Android模拟器在安装由Flutter APP产生的app-release.apk遇到 the ...

信件

Laravel 支援信件寄送功能,可以经由各种服务发送信件,框架也预设有使用者登入时就寄送验证信的功...