123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- package xugu
- import (
- "bytes"
- "context"
- "database/sql/driver"
- "encoding/binary"
- "errors"
- "fmt"
- "net"
- "sync"
- )
- type xuguConn struct {
- dsnConfig
- conn net.Conn
- mu sync.Mutex
- useSSL bool // 是否使用加密
- havePrepare int //default 0
- prepareNo int
- prepareName string
- //presPrepareCata *Result
- errStr []byte
- sendBuff bytes.Buffer
- readBuff buffer
- }
- type dsnConfig struct {
- IP string
- Port string
- Database string
- User string
- Password string
- Encryptor string //加密库的解密口令
- CharSet string //客户端使用的字符集名
- TimeZone string
- IsoLevel string //事务隔离级别
- LockTimeout string //加锁超时
- AutoCommit string
- StrictCommit string
- Result string
- ReturnSchema string
- ReturnCursorID string
- LobRet string
- ReturnRowid string
- Version string
- }
- func (xgConn *xuguConn) Begin() (driver.Tx, error) {
- _, err := xgConn.exec("Begin;", nil)
- if err != nil {
- return nil, err
- }
- return &xuguTx{tconn: xgConn}, nil
- }
- func (xgConn *xuguConn) Close() error {
- xgConn.mu.Lock()
- defer xgConn.mu.Unlock()
- err := xgConn.conn.Close()
- if err != nil {
- xgConn.mu.Unlock()
- return err
- }
- return nil
- }
- func (xgConn *xuguConn) Query(sql string,
- args []driver.Value) (driver.Rows, error) {
- xgConn.mu.Lock()
- defer xgConn.mu.Unlock()
- // 检测sql语句不是查询则报错
- if switchSQLType(sql) != SQL_SELECT {
- return nil, errors.New("The executed SQL statement is not a SELECT")
- }
- // 有传进来的参数
- if len(args) != 0 {
- values := []xuguValue{}
- //判断类型
- for _, param := range args {
- err := assertParamType(param, &values)
- if err != nil {
- return nil, err
- }
- }
- //send msg
- if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
- return nil, err
- }
- if err := sockSendExecute(xgConn); err != nil {
- return nil, err
- }
- } else {
- //send msg
- if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
- return nil, err
- }
- if err := sockSendExecute(xgConn); err != nil {
- return nil, err
- }
- }
- //recv msg
- aR, err := xuguSockRecvMsg(xgConn)
- if err != nil {
- return nil, err
- }
- switch aR.rt {
- case selectResult:
- rows := &xuguRows{
- rows_conn: xgConn,
- results: aR.s,
- colIdx: 0,
- prepared: false,
- }
- return rows, nil
- case errInfo:
- return nil, errors.New(string(aR.e.ErrStr))
- case warnInfo:
- return nil, errors.New(string(aR.w.WarnStr))
- default:
- }
- return nil, errors.New("xugu Query error")
- }
- func (xgConn *xuguConn) Ping(ctx context.Context) error {
- //send
- xgConn.mu.Lock()
- defer xgConn.mu.Unlock()
- sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
- sockSendExecute(xgConn)
- _, err := xuguSockRecvMsg(xgConn)
- if err != nil {
- return err
- }
- xgConn.readBuff.reset()
- return nil
- }
- func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
- xgConn.mu.Lock()
- defer xgConn.mu.Unlock()
- //判断sql类型
- switch switchSQLType(sql) {
- case SQL_PROCEDURE:
- return nil, errors.New("Prepare does not support stored procedures")
- case SQL_UNKNOWN:
- return nil, errors.New("Unknown SQL statement type")
- case SQL_CREATE:
- return nil, errors.New("Prepare does not support DDL.")
- }
- //发送创建prepare
- prepareName := "GTONG"
- err := xuguPrepare(xgConn, sql, prepareName)
- if err != nil {
- return nil, err
- }
- count := assertParamCount(sql)
- stmt := &xuguStmt{
- stmt_conn: xgConn,
- prepared: true,
- prename: make([]byte, 128),
- curopend: false,
- curname: make([]byte, 128),
- paramCount: count,
- mysql: sql,
- }
- stmt.prename = []byte(fmt.Sprintf("? %s", xgConn.prepareName))
- return stmt, nil
- }
- func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
- prepareName = fmt.Sprintf("%s%d", prepareName, pConn.prepareNo)
- sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
- pConn.prepareName = prepareName
- pConn.prepareNo++
- //send msg
- sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
- sockSendExecute(pConn)
- //recv msg
- aR, err := xuguSockRecvMsg(pConn)
- switch aR.rt {
- case errInfo:
- return errors.New(string(aR.e.ErrStr))
- case warnInfo:
- return errors.New(string(aR.w.WarnStr))
- default:
- }
- if err != nil {
- return err
- }
- return nil
- }
- func xuguUnPrepare(pConn *xuguConn, prepareName string) error {
- sqlRet := fmt.Sprintf("DEALLOCATE %s ", prepareName)
- //send msg
- sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
- sockSendExecute(pConn)
- //recv msg
- aR, err := xuguSockRecvMsg(pConn)
- switch aR.rt {
- case 'K':
- return nil
- case errInfo:
- return errors.New(string(aR.e.ErrStr))
- case warnInfo:
- return errors.New(string(aR.w.WarnStr))
- default:
- }
- if err != nil {
- return err
- }
- return nil
- }
- func (xgConn *xuguConn) Exec(sql string,
- args []driver.Value) (driver.Result, error) {
- xgConn.mu.Lock()
- defer xgConn.mu.Unlock()
- // 检测sql语句是查询则报错
- if switchSQLType(sql) == SQL_SELECT {
- return nil, errors.New("The executed SQL statement is a SELECT")
- }
- // 有传进来的参数
- if len(args) != 0 {
- values := []xuguValue{}
- //判断类型
- for _, param := range args {
- err := assertParamType(param, &values)
- if err != nil {
- return nil, err
- }
- }
- //send msg
- if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
- return nil, err
- }
- if err := sockSendExecute(xgConn); err != nil {
- return nil, err
- }
- } else {
- //send msg
- if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
- return nil, err
- }
- if err := sockSendExecute(xgConn); err != nil {
- return nil, err
- }
- }
- //recv msg
- aR, err := xuguSockRecvMsg(xgConn)
- if err != nil {
- return nil, err
- }
- switch aR.rt {
- case selectResult:
- return nil, errors.New("exec is Query error")
- case errInfo:
- return nil, errors.New(string(aR.e.ErrStr))
- case warnInfo:
- return nil, errors.New(string(aR.w.WarnStr))
- case updateResult:
- return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
- case insertResult:
- return &xuguResult{affectedRows: int64(0), insertId: int64(aR.i.RowidLen)}, nil
- default:
- return &xuguResult{
- affectedRows: int64(0),
- insertId: int64(0),
- }, nil
- }
- }
- func (xgConn *xuguConn) exec(sql string, args []driver.Value) (driver.Result, error) {
- // 有传进来的参数
- if len(args) != 0 {
- values := []xuguValue{}
- //判断类型
- for _, param := range args {
- err := assertParamType(param, &values)
- if err != nil {
- return nil, err
- }
- }
- //send msg
- if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
- return nil, err
- }
- if err := sockSendExecute(xgConn); err != nil {
- return nil, err
- }
- } else {
- //send msg
- if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
- return nil, err
- }
- if err := sockSendExecute(xgConn); err != nil {
- return nil, err
- }
- }
- //recv msg
- aR, err := xuguSockRecvMsg(xgConn)
- if err != nil {
- return nil, err
- }
- switch aR.rt {
- case selectResult:
- return nil, errors.New("exec is Query error")
- case updateResult:
- return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
- case insertResult:
- return &xuguResult{affectedRows: int64(0), insertId: int64(binary.LittleEndian.Uint64(aR.i.RowidData))}, nil
- case errInfo:
- return nil, errors.New(string(aR.e.ErrStr))
- case warnInfo:
- return nil, errors.New(string(aR.w.WarnStr))
- default:
- return &xuguResult{
- affectedRows: int64(0),
- insertId: int64(0),
- }, nil
- }
- }
|