Day.17 应用中学习 - 实务操作资料库写入 ( golang / sql )

当我们要确保资料是否有成功insert,除了使用程序逻辑上的Lock控制还会搭配到使用事务执行流程去管控。

透过以下应用举例,达到控制写入资料&事务流程设计。 过程注解写在代码中/images/emoticon/emoticon33.gif

  • 程序语言

Golang

  • 流程模拟

假设有5笔资料要做写入,流程上如何达到(写语法)与(执行语法)错开的状况,并透过事务流程完成提交。

  • 流程设计

组语法部分 : 每5s取一笔数据去组insert into .... values (...)的values资料。
执行写入DB部分: 每10s触发一次提交数据。

先看程序执行完成LOG:

https://ithelp.ithome.com.tw/upload/images/20210913/20130880AkiZ367mqE.png


昨天先建好的资料:

mysql> select * from user_lists;
+---------+---------+-------+-----------------+-------------+
| user_id | account | level | last_login_time | create_time |
+---------+---------+-------+-----------------+-------------+
|  100001 | siang05 |     1 |      1630385990 |  1330385990 |
|  100002 | siang01 |     1 |      1630385990 |  1330385990 |
|  100003 | siang02 |     1 |      1630385990 |  1330385990 |
|  100004 | siang03 |     1 |      1630385990 |  1330385990 |
|  100005 | siang04 |     1 |      1630385990 |  1330385990 |
+---------+---------+-------+-----------------+-------------+
5 rows in set (0.00 sec)

  • 温馨提醒:
    部分程序逻辑在於个人设计,为了方便呈现。在实际运用上要做修改喔/images/emoticon/emoticon31.gif
    ps. 复制贴上比较好观看
package main

import (
	"database/sql"
	"fmt"
	"sync"
	"time"

	_ "github.com/go-sql-driver/mysql"
)

func main() {

	//Init connect object.  content-> account:password@tcp(host_ip:port)/db_name
	dbu, err := sql.Open("mysql", "root:1234@tcp(127.0.0.1:3306)/user?charset=utf8&parseTime=True")
	if err != nil {
		fmt.Println("open mysql error", err)
		return
	}

	//close conn.
	defer dbu.Close()

	//Create mysql connect.
	err = dbu.Ping()
	if err != nil {
		fmt.Println("create mysql connect  error", err)
		return
	}

	/*
	  >以下连接池设定
	      注意点:
	   (1.)mysql连线预设保存时间为8小时,闲置超过8小时会被mysql断开变失效连线。
	    查询: show variables like '%wait_timeout%';

	    Q: 使用到被mysql断开的连线会产生ERROR -> packets.go:36: unexpected EOF

	   (2.)mysql最大连线数。
	    查询: show variables like '%max_connections%';

	*/

	//设置最大并发连线数,超过连线数需等待,直到其中连接被释放并变为空闲。
	dbu.SetMaxOpenConns(100)
	//设置最大的空闲连接数,适当的设定空闲连接数(会占用内存)将提高性能,减少从头建立新连接的可能。
	dbu.SetMaxIdleConns(10)
	//设置连线的生命周期,过期後无法重用。
	dbu.SetConnMaxLifetime(30)

	/*
		以下内容单纯举例大致流程

		重点: 资料写入的流程 or 如果写入发生中断造成失败要怎麽处理後续步骤~ 在於个人设计了...举例上没有多做这部分逻辑处理。

		流程上模拟: 今天有5笔资料要做写入,流程上如何达到(写语法)与(执行语法)错开的状况,并透过事务流程完成提交。

		在整个流程上 => 组语法部分    : 每5s取一笔数据组insert into .... values (...) 的values资料。
		              执行写入DB部分: 每10s触发一次提交数据。

	*/

	//假设有5笔资料要做写入 (写入资料来源)
	chanBuf := make(chan string, 5)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100002, "siang01", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100003, "siang02", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100004, "siang03", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100005, "siang04", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100001, "siang05", 1, 1630385990, 1330385990)

	fmt.Printf("目前等待写入笔数: %d\n", len(chanBuf))

	go GetSqlData(chanBuf)
	go ExecTransaction(dbu)

	//阻塞主线程防提早执行完,确认5笔资料都完成才退出。
	for {
		if checker == 5 {
			fmt.Printf("All finish")
			break
		}
	}
}

var locks sync.Mutex
var values string //组语法值的部分
var checker int64 //检查笔数

//GetSqlData 组合写入资料内容
func GetSqlData(data chan string) {
	for {

		//为了能看出流程怎麽走才加的
		time.Sleep(5 * time.Second)

		//读取data
		info := <-data

		//防止定时执行写入机制触发时还在组写入资料
		locks.Lock()
		values += info
		locks.Unlock()
	}
}

//RollbackTX 回滚事务释放连线
func RollbackTX(tx *sql.Tx) error {
	err := tx.Rollback()
	if err != sql.ErrTxDone && err != nil {
		fmt.Println("[ERROR] tx rollback error", err)
		return err
	}
	return nil
}

//ExecTransaction 执行事务提交流程
func ExecTransaction(dbu *sql.DB) {

	//INSERT组资料的来源看个人设计, 这边locks机制是为了确保组合SQL的部分不会再有资料写入
	insert := "INSERT INTO user_lists(user_id,account,level,last_login_time,create_time) values"

	//设置计时器每10秒触发一次写入流程
	ticker := time.NewTicker(10 * time.Second)
	for range ticker.C {

		if values != "" {

			//处理语法字串的最後一个逗点变成结束符号(, -> ;)
			if values[len(values)-1:] == "," {
				values = values[0:len(values)-1] + ";"
			}

			//完整写入语法
			sql := insert + values
			fmt.Println("[READY] 本次执行语法: ", sql)

			fmt.Println("[START] 进入Transaction 流程")

			//lock -> 禁止values继续写入新资料
			locks.Lock()

			//进入交易模式
			fmt.Println("[START] 进入Begin 流程")

			tx, beginErr := dbu.Begin()
			if beginErr != nil {

				if errs := RollbackTX(tx); errs != nil {
					fmt.Println("[ERROR] Begin rollback error: ", errs)
				}
				fmt.Println("[ERROR] Begin error: ", beginErr)
				values = ""
				locks.Unlock()
				continue
			}

			//执行insert动作
			fmt.Println("[START] 进入Exec SQL 流程")

			rows, execErr := tx.Exec(sql)
			if execErr != nil {

				if errs := RollbackTX(tx); errs != nil {
					fmt.Println("[ERROR] Exec rollback error: ", errs)
				}
				fmt.Println("[ERROR] Exec error: ", execErr)
				values = ""
				locks.Unlock()
				continue
			}

			//提交事务
			fmt.Println("[START] 进入Commit 流程")

			if CommitErr := tx.Commit(); CommitErr != nil {

				if errs := RollbackTX(tx); errs != nil {
					fmt.Println("[ERROR] Commit rollback error: ", errs)
				}
				fmt.Println("[ERROR] Commit error: ", CommitErr)
				values = ""
				locks.Unlock()
				continue
			}

			//检查笔数目前已经到几笔了(返回笔数)
			check, _ := rows.RowsAffected()
			fmt.Printf("[LOG] 目前完成笔数: %d\n", checker+check)

			checker += check

			//将写入成功的数据清除
			values = ""
			//解锁-> values可以继续写入
			locks.Unlock()

			fmt.Println("[OK] 完成本次作业")
		}
	}
}

明天来介绍mysql重要的索引 !! /images/emoticon/emoticon37.gif


<<:  Day 10 - Spring Boot 建立专案

>>:  Day 12 AWS云端实作起手式第二弹 开始拼拼图吧

PHP 资料库取值字串型态问题与解决纪录

问题描述 资料库栏位型态里面设定的型态是varchar 但在php里面取值时(如下面程序码),有可能...

Day 4 - 介绍Laravel Eloquent ORM

前一篇介绍了如何运用 Laravel 框架设计模式规划大型专案,当中有提到Model,今天就来介绍这...

Day21:开发自己的 APP 的前置步骤

前言 之後几天会拿来做一个 app~ 在查了一些资料之後, 整理了 开发 APP 的步骤。 开发步骤...

数字认证(Digital Authentication)

以下是《数字身份准则》(NIST SP 800-63-3)的摘录: . 在数字认证申请人拥有并控制一...

【左京淳的JAVA WEB学习笔记】第七章 AJAX与JSON格式

AJAX指的是局部更新页面的技术,例如按了赞之後图示会变成实心的,按赞数也会增加之类的。 这个技术是...