当我们要确保资料是否有成功insert,除了使用程序逻辑上的Lock控制还会搭配到使用事务执行流程去管控。
透过以下应用举例,达到控制写入资料&事务流程设计。 过程注解写在代码中
Golang
假设有5笔资料要做写入,流程上如何达到(写语法)与(执行语法)错开的状况,并透过事务流程完成提交。
组语法部分 : 每5s取一笔数据去组insert into .... values (...)的values资料。
执行写入DB部分: 每10s触发一次提交数据。
先看程序执行完成LOG:
昨天先建好的资料:
mysql> select * from user_lists;
+---------+---------+-------+-----------------+-------------+
| user_id | account | level | last_login_time | create_time |
+---------+---------+-------+-----------------+-------------+
| 100001 | siang05 | 1 | 1630385990 | 1330385990 |
| 100002 | siang01 | 1 | 1630385990 | 1330385990 |
| 100003 | siang02 | 1 | 1630385990 | 1330385990 |
| 100004 | siang03 | 1 | 1630385990 | 1330385990 |
| 100005 | siang04 | 1 | 1630385990 | 1330385990 |
+---------+---------+-------+-----------------+-------------+
5 rows in set (0.00 sec)
package main
import (
"database/sql"
"fmt"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
func main() {
//Init connect object. content-> account:password@tcp(host_ip:port)/db_name
dbu, err := sql.Open("mysql", "root:1234@tcp(127.0.0.1:3306)/user?charset=utf8&parseTime=True")
if err != nil {
fmt.Println("open mysql error", err)
return
}
//close conn.
defer dbu.Close()
//Create mysql connect.
err = dbu.Ping()
if err != nil {
fmt.Println("create mysql connect error", err)
return
}
/*
>以下连接池设定
注意点:
(1.)mysql连线预设保存时间为8小时,闲置超过8小时会被mysql断开变失效连线。
查询: show variables like '%wait_timeout%';
Q: 使用到被mysql断开的连线会产生ERROR -> packets.go:36: unexpected EOF
(2.)mysql最大连线数。
查询: show variables like '%max_connections%';
*/
//设置最大并发连线数,超过连线数需等待,直到其中连接被释放并变为空闲。
dbu.SetMaxOpenConns(100)
//设置最大的空闲连接数,适当的设定空闲连接数(会占用内存)将提高性能,减少从头建立新连接的可能。
dbu.SetMaxIdleConns(10)
//设置连线的生命周期,过期後无法重用。
dbu.SetConnMaxLifetime(30)
/*
以下内容单纯举例大致流程
重点: 资料写入的流程 or 如果写入发生中断造成失败要怎麽处理後续步骤~ 在於个人设计了...举例上没有多做这部分逻辑处理。
流程上模拟: 今天有5笔资料要做写入,流程上如何达到(写语法)与(执行语法)错开的状况,并透过事务流程完成提交。
在整个流程上 => 组语法部分 : 每5s取一笔数据组insert into .... values (...) 的values资料。
执行写入DB部分: 每10s触发一次提交数据。
*/
//假设有5笔资料要做写入 (写入资料来源)
chanBuf := make(chan string, 5)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100002, "siang01", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100003, "siang02", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100004, "siang03", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100005, "siang04", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100001, "siang05", 1, 1630385990, 1330385990)
fmt.Printf("目前等待写入笔数: %d\n", len(chanBuf))
go GetSqlData(chanBuf)
go ExecTransaction(dbu)
//阻塞主线程防提早执行完,确认5笔资料都完成才退出。
for {
if checker == 5 {
fmt.Printf("All finish")
break
}
}
}
var locks sync.Mutex
var values string //组语法值的部分
var checker int64 //检查笔数
//GetSqlData 组合写入资料内容
func GetSqlData(data chan string) {
for {
//为了能看出流程怎麽走才加的
time.Sleep(5 * time.Second)
//读取data
info := <-data
//防止定时执行写入机制触发时还在组写入资料
locks.Lock()
values += info
locks.Unlock()
}
}
//RollbackTX 回滚事务释放连线
func RollbackTX(tx *sql.Tx) error {
err := tx.Rollback()
if err != sql.ErrTxDone && err != nil {
fmt.Println("[ERROR] tx rollback error", err)
return err
}
return nil
}
//ExecTransaction 执行事务提交流程
func ExecTransaction(dbu *sql.DB) {
//INSERT组资料的来源看个人设计, 这边locks机制是为了确保组合SQL的部分不会再有资料写入
insert := "INSERT INTO user_lists(user_id,account,level,last_login_time,create_time) values"
//设置计时器每10秒触发一次写入流程
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
if values != "" {
//处理语法字串的最後一个逗点变成结束符号(, -> ;)
if values[len(values)-1:] == "," {
values = values[0:len(values)-1] + ";"
}
//完整写入语法
sql := insert + values
fmt.Println("[READY] 本次执行语法: ", sql)
fmt.Println("[START] 进入Transaction 流程")
//lock -> 禁止values继续写入新资料
locks.Lock()
//进入交易模式
fmt.Println("[START] 进入Begin 流程")
tx, beginErr := dbu.Begin()
if beginErr != nil {
if errs := RollbackTX(tx); errs != nil {
fmt.Println("[ERROR] Begin rollback error: ", errs)
}
fmt.Println("[ERROR] Begin error: ", beginErr)
values = ""
locks.Unlock()
continue
}
//执行insert动作
fmt.Println("[START] 进入Exec SQL 流程")
rows, execErr := tx.Exec(sql)
if execErr != nil {
if errs := RollbackTX(tx); errs != nil {
fmt.Println("[ERROR] Exec rollback error: ", errs)
}
fmt.Println("[ERROR] Exec error: ", execErr)
values = ""
locks.Unlock()
continue
}
//提交事务
fmt.Println("[START] 进入Commit 流程")
if CommitErr := tx.Commit(); CommitErr != nil {
if errs := RollbackTX(tx); errs != nil {
fmt.Println("[ERROR] Commit rollback error: ", errs)
}
fmt.Println("[ERROR] Commit error: ", CommitErr)
values = ""
locks.Unlock()
continue
}
//检查笔数目前已经到几笔了(返回笔数)
check, _ := rows.RowsAffected()
fmt.Printf("[LOG] 目前完成笔数: %d\n", checker+check)
checker += check
//将写入成功的数据清除
values = ""
//解锁-> values可以继续写入
locks.Unlock()
fmt.Println("[OK] 完成本次作业")
}
}
}
明天来介绍mysql重要的索引 !!
>>: Day 12 AWS云端实作起手式第二弹 开始拼拼图吧
问题描述 资料库栏位型态里面设定的型态是varchar 但在php里面取值时(如下面程序码),有可能...
前一篇介绍了如何运用 Laravel 框架设计模式规划大型专案,当中有提到Model,今天就来介绍这...
前言 之後几天会拿来做一个 app~ 在查了一些资料之後, 整理了 开发 APP 的步骤。 开发步骤...
以下是《数字身份准则》(NIST SP 800-63-3)的摘录: . 在数字认证申请人拥有并控制一...
AJAX指的是局部更新页面的技术,例如按了赞之後图示会变成实心的,按赞数也会增加之类的。 这个技术是...