ballerina 学习十八 事务编程

时间:2023-03-09 18:25:45
ballerina 学习十八  事务编程

事务在分布式开发,以及微服务开发中是比较重要的

ballerina 支持 本地事务、xa 事务、分布式事务 ,但是具体的服务实现起来需要按照ballerian 的事务模型 infection agreement

基本事务使用(本地事务)

  • 参考代码(数据库)
import ballerina/mysql;
import ballerina/io;
endpoint mysql:Client testDB {
host: "localhost",
port: 3306,
name: "testdb",
username: "root",
password: "root",
poolOptions: { maximumPoolSize: 5 }
};function main(string... args) {
var ret = testDB->update("CREATE TABLE CUSTOMER (ID INT, NAME
VARCHAR(30))");
handleUpdate(ret, "Create CUSTOMER table"); ret = testDB->update("CREATE TABLE SALARY (ID INT, MON_SALARY FLOAT)");
handleUpdate(ret, "Create SALARY table");
transaction with retries = 4, oncommit = onCommitFunction,
onabort = onAbortFunction {
var result = testDB->update("INSERT INTO CUSTOMER(ID,NAME)
VALUES (1, 'Anne')");
result = testDB->update("INSERT INTO SALARY (ID, MON_SALARY)
VALUES (1, 2500)");
match result {
int c => {
io:println("Inserted count: " + c);
if (c == 0) {
abort;
}
}
error err => {
retry;
}
}
} onretry {
io:println("Retrying transaction");
}
ret = testDB->update("DROP TABLE CUSTOMER");
handleUpdate(ret, "Drop table CUSTOMER"); ret = testDB->update("DROP TABLE SALARY");
handleUpdate(ret, "Drop table SALARY");
testDB.stop();
}
function onCommitFunction(string transactionId) {
io:println("Transaction: " + transactionId + " committed");
}
function onAbortFunction(string transactionId) {
io:println("Transaction: " + transactionId + " aborted");
}
function handleUpdate(int|error returned, string message) {
match returned {
int retInt => io:println(message + " status: " + retInt);
error err => io:println(message + " failed: " + err.message);
}
}

分布式事务

import ballerina/math;
import ballerina/http;
import ballerina/log;
import ballerina/transactions;
@http:ServiceConfig {
basePath: "/"
}
service<http:Service> InitiatorService bind { port: 8080 } { @http:ResourceConfig {
methods: ["GET"],
path: "/"
}
init(endpoint conn, http:Request req) {
http:Response res = new;
log:printInfo("Initiating transaction...");
transaction with oncommit = printCommit,
onabort = printAbort {
log:printInfo("Started transaction: " +
transactions:getCurrentTransactionId());
boolean successful = callBusinessService();
if (successful) {
res.statusCode = http:OK_200;
} else {
res.statusCode = http:INTERNAL_SERVER_ERROR_500;
abort;
}
} var result = conn->respond(res);
match result {
error e =>
log:printError("Could not send response back to client", err = e);
() =>
log:printInfo("Sent response back to client");
}
}
}
function printAbort(string transactionId) {
log:printInfo("Initiated transaction: " + transactionId + " aborted");
}
function printCommit(string transactionId) {
log:printInfo("Initiated transaction: " + transactionId + " committed");
}function callBusinessService() returns boolean {
endpoint http:Client participantEP {
url: "http://localhost:8889/stockquote/update"
}; boolean successful; float price = math:randomInRange(200, 250) + math:random();
json bizReq = { symbol: "GOOG", price: price };
http:Request req = new;
req.setJsonPayload(bizReq);
var result = participantEP->post("", request = req);
log:printInfo("Got response from bizservice");
match result {
http:Response res => {
successful = (res.statusCode == http:OK_200) ? true : false;
}
error => successful = false;
}
return successful;
} import ballerina/log;
import ballerina/io;
import ballerina/http;
import ballerina/transactions;
@http:ServiceConfig {
basePath: "/stockquote"
}
service<http:Service> ParticipantService bind { port: 8889 } { @http:ResourceConfig {
path: "/update"
}
updateStockQuote(endpoint conn, http:Request req) {
log:printInfo("Received update stockquote request");
http:Response res = new;
transaction with oncommit = printParticipantCommit,
onabort = printParticipantAbort { log:printInfo("Joined transaction: " +
transactions:getCurrentTransactionId()); var updateReq = untaint req.getJsonPayload();
match updateReq {
json updateReqJson => {
string msg =
io:sprintf("Update stock quote request received.
symbol:%j, price:%j",
updateReqJson.symbol,
updateReqJson.price);
log:printInfo(msg); json jsonRes = { "message": "updating stock" };
res.statusCode = http:OK_200;
res.setJsonPayload(jsonRes);
}
error e => {
res.statusCode = http:INTERNAL_SERVER_ERROR_500;
res.setPayload(e.message);
log:printError("Payload error occurred!", err = e);
}
} var result = conn->respond(res);
match result {
error e =>
log:printError("Could not send response back to initiator",
err = e);
() =>
log:printInfo("Sent response back to initiator");
}
}
}
}
function printParticipantAbort(string transactionId) {
log:printInfo("Participated transaction: " + transactionId + " aborted");
}
function printParticipantCommit(string transactionId) {
log:printInfo("Participated transaction: " + transactionId + " committed");
}

参考资料

https://ballerina.io/learn/by-example/transactions-distributed.html
https://ballerina.io/learn/by-example/local-transactions.html