0%

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static function lotto($weight = array())
{
$roll = sprintf("%.2f", mt_rand() / mt_getrandmax() * (array_sum($weight)));
$_tmpW = 0;
$rollnum = 0;
foreach ($weight as $k => $v) {
$min = $_tmpW;
$_tmpW += $v;
$max = $_tmpW;
if ($roll > $min && $roll <= $max) {
$rollnum = $k;
break;
}
}
if ($rollnum == 0 && !is_string($rollnum)) {
return self::lotto($weight);
}
return $rollnum;
}
$lottoArr[1] = 40; //要随机的id =》权重
$lottoArr[2] = 20;
$lottoArr[3] = 40;
$randId = self::lotto($lottoArr); //开始随机

只是为了测试,具体可以参考其他开源软件的写法

原理都是动态创建数据库,导入基础sql,包含管理员信息。然后生成一个install.lock的文件,下次进来判断有这个文件,证明是安装过了

html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>安装程序</title>
</head>
<body>
<h1>安装程序</h1>
<form action="install.php" method="post">
<label for="dbHost">数据库主机:</label>
<input type="text" id="dbHost" name="dbHost" required><br><br>
<label for="dbHost">数据库端口:</label>
<input type="number" id="dbPort" name="dbPort" required><br><br>
<label for="dbName">数据库名:</label>
<input type="text" id="dbName" name="dbName" required><br><br>
<label for="dbUser">数据库用户:</label>
<input type="text" id="dbUser" name="dbUser" required><br><br>
<label for="dbPassword">数据库密码:</label>
<input type="password" id="dbPassword" name="dbPassword"><br><br>

<input type="submit" value="安装">
</form>
</body>
</html>

php代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php
$hostname = $_POST['dbHost'] ?? 'localhost';
$dbPort = $_POST['dbPort'] ?? 3306;
$username = $_POST['dbUser'] ?? '';
$password = $_POST['dbPassword'] ?? '';
$dbName = $dbName ?? '';

// 数据库连接 DSN
$dsn = "mysql:host=$hostname;port=$dbPort";

try {
// 创建一个新的 PDO 实例
$db = new PDO($dsn, $username, $password);
$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// 动态创建数据库
$createDbSql = "CREATE DATABASE IF NOT EXISTS `$dbName` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";
$db->exec($createDbSql);

// 选择数据库
$db->exec("USE `$dbName`");

// 创建管理员表,根据需要插入数据,或者直接导入sql
$createTableSql = "CREATE TABLE IF NOT EXISTS users (
id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(30) NOT NULL,
password VARCHAR(255) NOT NULL,
email VARCHAR(50),
reg_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)";
$db->exec($createTableSql);

echo "安装成功,数据库和表已创建。";
} catch (PDOException $e) {
$err = "安装失败: " . $e->getMessage();
echo $err;
}
?>

先看一个代码

1
2
3
4
5
6
7
$num = 0;

try {
echo 1 / $num;
} catch (Exception $e) {
echo $e->getMessage();
}

这时候得catch是无法捕获除数为0得错误

修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php
function errorHandler($errno, $errstr, $errfile, $errline) {
// 检查错误类型是否为除以零
if ($errno == E_WARNING && strpos($errstr, 'Division by zero') !== false) {
throw new Exception('Division by zero error');
}
// 可以在这里处理其他类型的错误
}

// 设置自定义错误处理函数
set_error_handler('errorHandler');

$num = 0;

try {
echo 1 / $num;
} catch (Exception $e) {
echo $e->getMessage(); // 这将输出 "Division by zero error"
}

这种是市面上比较常用的,但是需要数据库存储。或者自己写一套加解密的方法,根据code进行解密,效率更高

直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
function shortUrl($url)
{
$charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
$key = 'this-is-salt'; // 加盐
$timestamp = time(); // 时间戳
$random = mt_rand(); // 随机数
$urlHash = md5($key . $url . $random . $url);
// 只使用哈希值的一部分来生成短链接
$shortUrl = '';
for ($i = 0; $i < 6; $i++) {
// 取哈希值的某一部分并进行模运算,然后转换为字符
$index = hexdec(substr($urlHash, $i, 1)) % strlen($charset);
$shortUrl .= $charset[$index];
}

return $shortUrl;
}

$input = 'https://detail.tmall.com/item.htm?spm=a21bo.jianhua/a.201876.d2.5af92a89Ifuxtc&id=749045568815&xxc=ad_ct&priceTId=2147802817238772080956563e4b72&pisk=fsCEs5qKrWFedmXkbhOy7kLPS-RphIEjY_tWrabkRHxnOXilQZIyAzepOO-PjG-h4aUpr3jl436QCS_dJQducQPbGwQLY3l1YUvkIzYDz2fj0wncJQdu49XRseQd0bjKJuJu7CYyz0AkKpVMjhLWqDjkxdmMuEdkqgAu7hYW-DDkE3DMjEhcIwbo_E93unLdbjE6vp-c7-hoaacXKnj2jbqc_ervmwxZZbxLlPQF7wZEMNOdgi8fA5cF0ZCObFj0alRRXTjkzGVxVBBfvspNA-cGSCKMnTva-WbwTHJ25KuSpBWlv_JdL4UASBjOHnp3BlLNOs9yDdogxN_wxKWPAlhkAt7GbK1sfjORXTjkzGmF4ciJSadcw9ooUpY97naaS5VTcqbw5jfoeYpMMF-bReM-epY97naa7YHJIELwcyTC.'; // 长链
$output = shortUrl($input);
var_dump($output);
?>

存数据库,字段id,short_url,long_url这些字段就够了

处理短连的接口逻辑

1
2
3
4
5
6
7
8
9
10
11
public function longUrl(){
$short_url = $_GET['code'];
$data = 查数据库;
//查不到就跳指定url
if(empty($data)){
header("location:https://www.taobao.com/");
}else{
$url = $data['long_url'];
header("location:$url");
}
}

padding的填充方式可以根据自己需要修改

php

1
2
3
4
5
6
7
$key       = 'a7gE3fH9jKmN1pQ2rS4tU6vY8zW9xL0';
$iv = '7hJ3kQxZW45mNpR';
$data = '123456';
$encrypted = openssl_encrypt($data, 'AES-256-CBC', $key, OPENSSL_RAW_DATA, $iv);
echo base64_encode($encrypted), PHP_EOL;
$decrypted = openssl_decrypt($encrypted, 'AES-256-CBC', $key, OPENSSL_RAW_DATA, $iv);
echo $decrypted;

go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/md5"
"encoding/base64"
"encoding/json"
"fmt"
)

func PKCS7Padding(ciphertext []byte, blockSize int) []byte {
padding := blockSize - len(ciphertext)%blockSize
padText := bytes.Repeat([]byte{byte(padding)}, padding)
return append(ciphertext, padText...)
}

func PKCS7UnPadding(origData []byte) []byte {
length := len(origData)
unpadding := int(origData[length-1])
return origData[:(length - unpadding)]
}

func AesEncryptCBC(orig string, key, iv string) string {
origData := []byte(orig)
k := []byte(key)

block, _ := aes.NewCipher(k)

blockSize := block.BlockSize()

origData = PKCS7Padding(origData, blockSize)

ivCopy := []byte(iv)
blockMode := cipher.NewCBCEncrypter(block, ivCopy[:blockSize])

cryted := make([]byte, len(origData))

blockMode.CryptBlocks(cryted, origData)
return base64.StdEncoding.EncodeToString(cryted)
}

func AesDecryptCBC(cryted string, key, iv string) string {
crytedByte, _ := base64.StdEncoding.DecodeString(cryted)
k := []byte(key)

block, _ := aes.NewCipher(k)

blockSize := block.BlockSize()

ivCopy := []byte(iv)
blockMode := cipher.NewCBCDecrypter(block, ivCopy[:blockSize])

orig := make([]byte, len(crytedByte))

blockMode.CryptBlocks(orig, crytedByte)

orig = PKCS7UnPadding(orig)
return string(orig)
}

func main() {
key := "a7gE3fH9jKmN1pQ2rS4tU6vY8zW9xL0"
iv := "7hJ3kQxZW45mNpR"
data := "123456"
encrypted := AesEncryptCBC(data, key, iv)
fmt.Println(encrypted)
decrypted := AesDecryptCBC(encrypted, key, iv)
fmt.Println(decrypted)
}

que.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php
class MsgQueue
{

public $queue;

public function __construct($queue)
{
$this->queue = $queue;
}

public function push($data, $type = 1)
{
$result = msg_send($this->queue, $type, $data);
return $result;
}

public function pop($type = 0,$flags = MSG_IPC_NOWAIT)
{
msg_receive($this->queue, $type, $message_type, 1024, $message,true,$flags);
// var_dump($message_type);
return $message;
}

public function close()
{
return msg_remove_queue($this->queue);
}

public static function getQueue($path_name = __FILE__, $prop = '1', $perms = '0666')
{
$data = array();
$data['queue_key'] = ftok($path_name, $prop);
$data['queue'] = msg_get_queue($data['queue_key'], $perms);
return $data;
}
}

父子通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php
include_once 'que.php';
$message_queue_key= ftok(__FILE__, 'a');
$message_queue= msg_get_queue($message_queue_key, 0666);
$queue_obj = new MsgQueue($message_queue);
$pid = pcntl_fork();
if($pid>0){//主进程入列
while(1){
$msg = $queue_obj->push((array('a'=>321312,'v'=>'casd')));
sleep(2);
}

}else{//子进程出列

while (1) {
$message = $queue_obj->pop();
if ($message !== false) {
var_dump($message);
}
sleep(1);
}
}

跨进程通信A和B。这个时候ipc仅仅相当于一个普通队列

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php
include_once 'que.php';

$message_queue_key = ftok(__FILE__, 'a');
$message_queue = msg_get_queue($message_queue_key, 0666);
$queue_obj = new MsgQueue($message_queue);

while (true) {
$msg = array('a' => 321312, 'v' => 'casd');
$queue_obj->push($msg);
echo "Sent message: " . print_r($msg, true) . "\n";
sleep(1); // 等待一段时间再发送下一条消息
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?php
include_once 'que.php';

$message_queue_key = ftok(__FILE__, 'a');
$message_queue = msg_get_queue($message_queue_key, 0666);
$queue_obj = new MsgQueue($message_queue);

while (true) {
$message = $queue_obj->pop();
if ($message !== false) {
echo "Received message: " . print_r($message, true) . "\n";
}
sleep(2); // 等待一段时间再处理下一条消息
}

使用$argv or $argc参数接收

1
2
3
<?php
echo "接收到{$argc}个参数";
print_r($argv);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
php test.php

接收到1个参数Array
(

    [0] => test.php

)
php test.php a b c d
接收到5个参数Array
(

    [0] => test.php

    [1] => a

    [2] => b

    [3] => c

    [4] => d

)

getopt

1
2
3
<?php
$param_arr = getopt('a:b:');
print_r($param_arr);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
php test.php -a 345

Array

(

    [a] => 345

)

php test.php -a 345 -b 12q3

Array

(

    [a] => 345

    [b] => 12q3

)

php test.php -a 345 -b 12q3 -e 3322ff
Array

(

    [a] => 345

    [b] => 12q3

)

fwrite

1
2
3
4
<?php
fwrite(STDOUT,'请输入您的信息:');

echo '您输入的信息是:'.fgets(STDIN);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?php
interface Middleware
{
public static function handle(Closure $next);
}

class Middleware1 implements Middleware
{
public static function handle(Closure $next)
{
echo "Middleware1 before\n";
$next();
echo "Middleware1 after\n";
}
}

class Middleware2 implements Middleware
{
public static function handle(Closure $next)
{
echo "Middleware2 before\n";
$next();
echo "Middleware2 after\n";
}
}

class Middleware3 implements Middleware
{
public static function handle(Closure $next)
{
echo "Middleware3 before\n";
$next();
echo "Middleware3 after\n";
}
}

function getSlice() // 返回一个函数,与上文的f一致
{
return function ($stack, $pipe) {
return function () use ($stack, $pipe) {
return $pipe::handle($stack);
};
};
}

function then()
{
$pipes = [
"Middleware1",
"Middleware2",
"Middleware3",
];

$firstSlice = function () { // 上文的目标函数 target
echo "请求向路由器传递,返回响应.\n";
};
//嵌套闭包的解包方式是先从外到内,然后从内回归到外面,所以要根据注册顺序进行逆序
$pipes = array_reverse($pipes);
$closure = array_reduce($pipes, getSlice(), $firstSlice);
var_dump($closure);//打印下该闭包
// 因为最终返回了一个函数,所以需要call_user_func
call_user_func(array_reduce($pipes, getSlice(), $firstSlice));
}
then();
echo "\n";
//array_reduce($pipes, getSlice(), $firstSlice),执行完成后相当于生成了f嵌套闭包
//function f()
//{
// return Middleware1::handle(
// function () {
// return Middleware2::handle(
// function () {
// return Middleware3::handle(function () {
// echo "请求向路由器传递,返回响应.\n";
// });
// }
// );
// }
// );
//}
//call_user_func('f');

docker安装seata

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
version: '3'
services:
seata-server:
image: seataio/seata-server:latest
ports:
- "8091:8091"
- "7091:7091"
environment:
- SEATA_PORT=8091
- STORE_MODE=file

mysql:
image: mysql:8.0.32
container_name: mysql
environment:
- MYSQL_ROOT_PASSWORD=12345678
command: --default-authentication-plugin=mysql_native_password --default-time-zone='+08:00'
volumes:
- ./mysql:/docker-entrypoint-initdb.d
- ./mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
ports:
- "3306:3306"

mysql数据库文件和mysql配置(可选不一定非要docker,只需要大于8.0就行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
CREATE database if NOT EXISTS seata_client default character set utf8mb4 collate utf8mb4_unicode_ci;
USE seata_client;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

CREATE TABLE IF NOT EXISTS order_tbl (
id int(11) NOT NULL AUTO_INCREMENT,
user_id varchar(255) DEFAULT NULL,
commodity_code varchar(255) DEFAULT NULL,
count int(11) DEFAULT '0',
money int(11) DEFAULT '0',
descs varchar(255) DEFAULT '',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO seata_client.order_tbl (id, user_id, commodity_code, count, money, descs) VALUES (1, 'NO-100001', 'C100000', 100, 10, 'init desc');

DROP TABLE IF EXISTS undo_log;

CREATE TABLE undo_log (
id bigint NOT NULL AUTO_INCREMENT,
branch_id bigint NOT NULL,
xid varchar(100) NOT NULL,
context varchar(128) NOT NULL,
rollback_info longblob NOT NULL,
log_status int NOT NULL,
log_created datetime NOT NULL,
log_modified datetime NOT NULL,
ext varchar(100) DEFAULT NULL,
PRIMARY KEY (id),
KEY idx_unionkey (xid,branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

本次主要测试at跟xa模式,at模式跟xa模式差距不大,at是seata特有的模式,需要本地一个undo_log记录数据

连接seata的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# time 时间单位对应的是 time.Duration(1)
seata:
enabled: true
# application id
application-id: applicationName
# service group
tx-service-group: default_tx_group
access-key: aliyunAccessKey
secret-key: aliyunSecretKey
enable-auto-data-source-proxy: true
data-source-proxy-mode: AT
client:
rm:
# Maximum cache length of asynchronous queue
async-commit-buffer-limit: 10000
# The maximum number of retries when report reports the status
report-retry-count: 5
# The interval for regularly checking the metadata of the db(AT)
table-meta-check-enable: false
# Whether to report the status if the transaction is successfully executed(AT)
report-success-enable: false
# Whether to allow regular check of db metadata(AT)
saga-branch-register-enable: false
saga-json-parser: fastjson
saga-retry-persist-mode-update: false
saga-compensate-persist-mode-update: false
#Ordered.HIGHEST_PRECEDENCE + 1000 #
tcc-action-interceptor-order: -2147482648
# Parse SQL parser selection
sql-parser-type: druid
lock:
retry-interval: 30
retry-times: 10
retry-policy-branch-rollback-on-conflict: true
tm:
commit-retry-count: 5
rollback-retry-count: 5
default-global-transaction-timeout: 60s
degrade-check: false
degrade-check-period: 2000
degrade-check-allow-times: 10s
interceptor-order: -2147482648
undo:
# Judge whether the before image and after image are the same,If it is the same, undo will not be recorded
data-validation: false
# Serialization method
log-serialization: json
# undo log table name
log-table: undo_log
# Only store modified fields
only-care-update-columns: true
compress:
# Compression type. Allowed Options: None, Gzip, Zip, Sevenz, Bzip2, Lz4, Zstd, Deflate
type: None
# Compression threshold Unit: k
threshold: 64k
load-balance:
type: RandomLoadBalance
virtual-nodes: 10
service:
vgroup-mapping:
# Prefix for Print Log
default_tx_group: default
grouplist:
default: 127.0.0.1:8091
enable-degrade: false
# close the transaction
disable-global-transaction: false
transport:
shutdown:
wait: 3s
# Netty related configurations
# type
type: TCP
server: NIO
heartbeat: true
# Encoding and decoding mode
serialization: seata
# Message compression mode
compressor: none
# Allow batch sending of requests (TM)
enable-tm-client-batch-send-request: false
# Allow batch sending of requests (RM)
enable-rm-client-batch-send-request: true
# RM send request timeout
rpc-rm-request-timeout: 30s
# TM send request timeout
rpc-tm-request-timeout: 30s
# Configuration Center
config:
type: file
file:
name: config.conf
nacos:
namespace: ""
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
username: ""
password: ""
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key: ""
#secret-key: ""
data-id: seata.properties
# Registration Center
registry:
type: file
file:
name: registry.conf
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: "SEATA_GROUP"
namespace: ""
username: ""
password: ""
##if use MSE Nacos with auth, mutex with username/password attribute #
#access-key: "" #
#secret-key: "" #
log:
exception-rate: 100
tcc:
fence:
# Anti suspension table name
log-table-name: tcc_fence_log_test
clean-period: 60s
# getty configuration
getty:
reconnect-interval: 0
# temporary not supported connection-num
connection-num: 1
session:
compress-encoding: false
tcp-no-delay: true
tcp-keep-alive: true
keep-alive-period: 120s
tcp-r-buf-size: 262144
tcp-w-buf-size: 65536
tcp-read-timeout: 1s
tcp-write-timeout: 5s
wait-timeout: 1s
max-msg-len: 16498688
session-name: client_test
cron-period: 1s

utils.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package util

import (
"database/sql"
"os"

sql2 "github.com/seata/seata-go/pkg/datasource/sql"
)

func GetAtMySqlDb() *sql.DB {
dsn := "root:root@tcp(192.168.252.1:3306)/seata_client?multiStatements=true&interpolateParams=true"
dbAt, err := sql.Open(sql2.SeataATMySQLDriver, dsn)
if err != nil {
panic("init seata at mysql driver error")
}
return dbAt
}

func GetXAMySqlDb() *sql.DB {
dsn := "root:root@tcp(192.168.252.1:3306)/seata_client?multiStatements=true&interpolateParams=true"
dbAt, err := sql.Open(sql2.SeataXAMySQLDriver, dsn)
if err != nil {
panic("init seata at mysql driver error")
}
return dbAt
}

客户端,模拟用户请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"context"
"flag"
"fmt"
"github.com/parnurzeal/gorequest"
"github.com/seata/seata-go/pkg/constant"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
"net/http"
"time"

"github.com/seata/seata-go/pkg/client"
)

var serverIpPort = "http://127.0.0.1:8080"

func main() {
flag.Parse()
client.InitPath("../../../conf/seatago.yml")

bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
sampleUpdate(bgCtx)

}
func updateData(ctx context.Context) (re error) {
request := gorequest.New()
log.Infof("branch transaction begin")
request.Post(serverIpPort+"/updateDataSuccess").
Set(constant.XidKey, tm.GetXID(ctx)).
End(func(response gorequest.Response, body string, errs []error) {
if response.StatusCode != http.StatusOK {
re = fmt.Errorf("update data fail")
}
})
return
}

func sampleUpdate(ctx context.Context) {
if err := tm.WithGlobalTx(ctx, &tm.GtxConfig{
Name: "ATSampleLocalGlobalTx_Update",
Timeout: time.Second * 30,
}, updateData); err != nil {
log.Info(err)
}
}

服务端,模拟api网关,往不同的微服务发请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main

import (
"context"
"database/sql"
"fmt"
"github.com/gin-gonic/gin"
"github.com/seata/seata-go-samples/util"
"github.com/seata/seata-go/pkg/client"
ginmiddleware "github.com/seata/seata-go/pkg/integration/gin"
"github.com/seata/seata-go/pkg/util/log"
"net/http"
)

var db *sql.DB

func main() {
client.InitPath("../../../conf/seatago.yml")
//TODO 这里是不同模式
// db = util.GetXAMySqlDb() //xa
db = util.GetAtMySqlDb() //at
r := gin.Default()
r.Use(ginmiddleware.TransactionMiddleware())
r.POST("/updateDataSuccess", updateDataSuccessHandler)
if err := r.Run(":8080"); err != nil {
log.Fatalf("start tcc server fatal: %v", err)
}
}

func updateDataSuccessHandler(c *gin.Context) {
log.Infof("get tm updateData")
if err := updateDataSuccess(c); err != nil {
c.JSON(http.StatusBadRequest, "updateData failure")
return
}
if err := updateDataSuccess2(c); err != nil {
c.JSON(http.StatusBadRequest, "updateData2 failure")
return
}
//c.JSON(http.StatusOK, "updateData ok") //成功
c.JSON(http.StatusBadRequest, "updateData failure") //TODO 测试fail,回滚
}

func updateDataSuccess(ctx context.Context) error {
sql := "update order_tbl set count=? where id=?"
ret, err := db.ExecContext(ctx, sql, 10, 1)
if err != nil {
fmt.Printf("update failed, err:%v\n", err)
return nil
}

rows, err := ret.RowsAffected()
if err != nil {
fmt.Printf("update failed, err:%v\n", err)
return nil
}
fmt.Printf("更新成功 success: %d.\n", rows)
return nil
}

func updateDataSuccess2(ctx context.Context) error {
sql := "update order_tbl set count=? where id=?"
ret, err := db.ExecContext(ctx, sql, 101, 1)
if err != nil {
fmt.Printf("update failed, err:%v\n", err)
return nil
}

rows, err := ret.RowsAffected()
if err != nil {
fmt.Printf("update failed, err:%v\n", err)
return nil
}
fmt.Printf("更新成功 success: %d.\n", rows)
return nil
}

把原生sql连接改成gorm连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main

import (
"context"
"database/sql"
"time"

"github.com/seata/seata-go/pkg/client"
sql2 "github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/tm"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

type OrderTblModel struct {
Id int64 `gorm:"column:id" json:"id"`
UserId string `gorm:"column:user_id" json:"user_id"`
CommodityCode string `gorm:"commodity_code" json:"commodity_code"`
Count int64 `gorm:"count" json:"count"`
Money int64 `gorm:"money" json:"money"`
Descs string `gorm:"descs" json:"descs"`
}

func main() {
initConfig()
// insert
tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
Name: "ATSampleLocalGlobalTx",
Timeout: time.Second * 30,
}, insertData)
<-make(chan struct{})
}

func initConfig() {
// init seata client config
client.InitPath("../../conf/seatago.yml")
// init db object
initDB()
}

var gormDB *gorm.DB

func initDB() {
sqlDB, err := sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true")
if err != nil {
panic("init service error")
}

gormDB, err = gorm.Open(mysql.New(mysql.Config{
Conn: sqlDB,
}), &gorm.Config{})
}

// insertData insert one data
func insertData(ctx context.Context) error {
data := OrderTblModel{
UserId: "NO-100003",
CommodityCode: "C100001",
Count: 101,
Money: 11,
Descs: "insert desc",
}

return gormDB.WithContext(ctx).Table("order_tbl").Create(&data).Error
}

// deleteData delete one data
func deleteData(ctx context.Context) error {
return gormDB.WithContext(ctx).Where("id = ?", "1").Delete(&OrderTblModel{}).Error
}

// updateDate update one data
func updateData(ctx context.Context) error {
return gormDB.WithContext(ctx).Model(&OrderTblModel{}).Where("id = ?", "1").Update("commodity_code", "C100002").Error
}

最后,如果报错 first phase error: undo log parser type jackson not found 那么请修改

这句是写死的,没有走配置

上篇用的ta跟xa。这次试试tcc

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"context"
"flag"
"fmt"
"net/http"
"time"

"github.com/parnurzeal/gorequest"

"github.com/seata/seata-go/pkg/client"
"github.com/seata/seata-go/pkg/constant"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
)

func main() {
flag.Parse()
client.InitPath("../../../conf/seatago.yml")
bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
serverIpPort := "http://127.0.0.1:8080"
var jsonData = map[string]interface{}{
"name": "xxJohn",
"age": 6000,
}
tm.WithGlobalTx(
bgCtx,
&tm.GtxConfig{
Name: "TccSampleLocalGlobalTx",
},
func(ctx context.Context) (re error) {
request := gorequest.New()
log.Infof("branch transaction begin")
request.Post(serverIpPort+"/prepare").Send(jsonData).
Set("Content-Type", "application/json").
Set(constant.XidKey, tm.GetXID(ctx)).
End(func(response gorequest.Response, body string, errs []error) {
if response.StatusCode != http.StatusOK {
re = fmt.Errorf("update data fail")
}
})
return
})
}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main

import (
"fmt"
"net/http"

"github.com/gin-gonic/gin"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/client"
ginmiddleware "github.com/seata/seata-go/pkg/integration/gin"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/util/log"
)

func main() {
client.InitPath("../../../conf/seatago.yml")

r := gin.Default()

r.Use(ginmiddleware.TransactionMiddleware())

userProviderProxy, err := tcc.NewTCCServiceProxy(&RMService{})
if err != nil {
log.Errorf("get userProviderProxy tcc service proxy error, %v", err.Error())
return
}
// 定义 JSON 数据的结构体
type Person struct {
Name string `json:"name"`
Age int `json:"age"`
}

r.POST("/prepare", func(c *gin.Context) {
var person Person
// 绑定请求体到 person 结构体
if err := c.BindJSON(&person); err != nil {
fmt.Println("请求json错误", err)
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.Set("rollbackData", person)
if _, err := userProviderProxy.Prepare(c, person); err != nil {
c.JSON(http.StatusBadRequest, "prepare failure")
return
}
c.JSON(http.StatusOK, "prepare ok")
//c.JSON(http.StatusBadRequest, "prepare error") // 模拟错误
})

if err := r.Run(":8080"); err != nil {
log.Fatalf("start tcc server fatal: %v", err)
}
}

type RMService struct {
Param interface{} //参数保存,防止rollback找不到
}

// 预提交事务,把参数保存起来
func (b *RMService) Prepare(ctx context.Context, params interface{}) (bool, error) {
b.Param = params
log.Infof("TRMService Prepare, param %v", params)
return true, nil
}
// 提交
func (b *RMService) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
log.Infof("RMService Commit, param %v", businessActionContext)
return true, nil
}
// 回滚
func (b *RMService) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
log.Infof("RMService Rollback, param %v", b.Param)
return true, nil
}

func (b *RMService) GetActionName() string {
return "ginTccRMService"
}