xugu_conn.go 4.8 KB


  1. package xugu
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql/driver"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "sync"
  10. )
  11. type xuguConn struct {
  12. dsnConfig
  13. conn net.Conn
  14. mu sync.Mutex
  15. useSSL bool // 是否使用加密
  16. havePrepare int //default 0
  17. prepareNo int
  18. prepareName string
  19. //presPrepareCata *Result
  20. errStr []byte
  21. sendBuff bytes.Buffer
  22. readBuff buffer
  23. }
  24. type dsnConfig struct {
  25. IP string
  26. Port string
  27. Database string
  28. User string
  29. Password string
  30. Encryptor string //加密库的解密口令
  31. CharSet string //客户端使用的字符集名
  32. TimeZone string
  33. IsoLevel string //事务隔离级别
  34. LockTimeout string //加锁超时
  35. AutoCommit string
  36. StrictCommit string
  37. Result string
  38. ReturnSchema string
  39. ReturnCursorID string
  40. LobRet string
  41. ReturnRowid string
  42. Version string
  43. }
  44. func (xgConn *xuguConn) Begin() (driver.Tx, error) {
  45. gt("xuguConn.Begin")
  46. defer gt("xuguConn.Begin end")
  47. return nil, nil
  48. }
  49. func (xgConn *xuguConn) Close() error {
  50. gt("xuguConn.Close")
  51. defer gt("xuguConn.Close end")
  52. xgConn.mu.Lock()
  53. defer xgConn.mu.Unlock()
  54. err := xgConn.conn.Close()
  55. if err != nil {
  56. fmt.Println("Close connection error")
  57. xgConn.mu.Unlock()
  58. return err
  59. }
  60. return nil
  61. }
  62. func (xgConn *xuguConn) Exec(sql string,
  63. args []driver.Value) (driver.Result, error) {
  64. gt("xuguConn.Exec")
  65. defer gt("xuguConn.Exec end")
  66. return nil, nil
  67. }
  68. func (xgConn *xuguConn) Query(sql string,
  69. args []driver.Value) (driver.Rows, error) {
  70. gt("xuguConn.Query")
  71. defer gt("xuguConn.Query end")
  72. xgConn.mu.Lock()
  73. defer xgConn.mu.Unlock()
  74. // 检测sql语句不是查询则报错
  75. if switchSQLType(sql) != SQL_SELECT {
  76. return nil, errors.New("The executed SQL statement is not a SELECT")
  77. }
  78. // 有传进来的参数
  79. if len(args) != 0 {
  80. values := []xuguValue{}
  81. //判断类型
  82. for _, param := range args {
  83. err := assertParamType(param, &values)
  84. if err != nil {
  85. return nil, err
  86. }
  87. }
  88. //send msg
  89. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  90. return nil, err
  91. }
  92. if err := sockSendExecute(xgConn); err != nil {
  93. return nil, err
  94. }
  95. } else {
  96. //send msg
  97. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  98. return nil, err
  99. }
  100. if err := sockSendExecute(xgConn); err != nil {
  101. return nil, err
  102. }
  103. }
  104. //recv msg
  105. aR, err := xuguSockRecvMsg(xgConn)
  106. if err != nil {
  107. return nil, err
  108. }
  109. switch aR.rt {
  110. case selectResult:
  111. rows := &xuguRows{
  112. rows_conn: xgConn,
  113. results: aR.s,
  114. colIdx: 0,
  115. prepared: false,
  116. }
  117. return rows, nil
  118. case errInfo:
  119. return nil, errors.New(string(aR.e.ErrStr))
  120. case warnInfo:
  121. return nil, errors.New(string(aR.w.WarnStr))
  122. default:
  123. }
  124. return nil, errors.New("xugu Query error")
  125. }
  126. func (xgConn *xuguConn) Ping(ctx context.Context) error {
  127. gt("xuguConn.Ping")
  128. defer gt("xuguConn.Ping end")
  129. //send
  130. xgConn.mu.Lock()
  131. sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
  132. sockSendExecute(xgConn)
  133. _, err := xuguSockRecvMsg(xgConn)
  134. if err != nil {
  135. fmt.Println("Ping err : ", err.Error())
  136. return err
  137. }
  138. xgConn.readBuff.reset()
  139. xgConn.mu.Unlock()
  140. return nil
  141. }
  142. func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
  143. gt("xuguConn.Prepare")
  144. defer gt("xuguConn.Prepare end")
  145. xgConn.mu.Lock()
  146. defer xgConn.mu.Unlock()
  147. //判断sql类型
  148. switch switchSQLType(sql) {
  149. case SQL_PROCEDURE:
  150. return nil, errors.New("Prepare does not support stored procedures")
  151. case SQL_UNKNOWN:
  152. return nil, errors.New("Unknown SQL statement type")
  153. case SQL_CREATE:
  154. return nil, errors.New("Prepare does not support DDL.")
  155. }
  156. //发送创建prepare
  157. prepareName := "GTONG"
  158. err := xuguPrepare(xgConn, sql, prepareName)
  159. if err != nil {
  160. return nil, err
  161. }
  162. count := assertParamCount(sql)
  163. stmt := &xuguStmt{
  164. stmt_conn: xgConn,
  165. prepared: true,
  166. prename: make([]byte, 128),
  167. curopend: false,
  168. curname: make([]byte, 128),
  169. paramCount: count,
  170. mysql: sql,
  171. }
  172. stmt.prename = []byte(xgConn.prepareName)
  173. return stmt, nil
  174. }
  175. func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
  176. fmt.Println("\n ---xuguPrepare")
  177. prepareName = fmt.Sprintf("%s%d", prepareName, pConn.prepareNo)
  178. sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
  179. pConn.prepareName = prepareName
  180. pConn.prepareNo++
  181. //send msg
  182. sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
  183. sockSendExecute(pConn)
  184. //recv msg
  185. _, err := xuguSockRecvMsg(pConn)
  186. if err != nil {
  187. fmt.Println("xuguPrepare parseMsg(&pConn.readBuff, pConn)")
  188. return err
  189. }
  190. fmt.Println("Message from server:", pConn.readBuff.buf[pConn.readBuff.idx:])
  191. fmt.Println("Message from server:", string(pConn.readBuff.buf[pConn.readBuff.idx:]))
  192. fmt.Println("\n ---xuguPrepare end")
  193. return nil
  194. }