xugu_conn.go 7.5 KB


  1. package xugu
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql/driver"
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "sync"
  11. )
  12. type xuguConn struct {
  13. dsnConfig
  14. conn net.Conn
  15. mu sync.Mutex
  16. useSSL bool // 是否使用加密
  17. havePrepare int //default 0
  18. prepareNo int
  19. prepareName string
  20. //presPrepareCata *Result
  21. errStr []byte
  22. sendBuff bytes.Buffer
  23. readBuff buffer
  24. }
  25. type dsnConfig struct {
  26. IP string
  27. Port string
  28. Database string
  29. User string
  30. Password string
  31. Encryptor string //加密库的解密口令
  32. CharSet string //客户端使用的字符集名
  33. TimeZone string
  34. IsoLevel string //事务隔离级别
  35. LockTimeout string //加锁超时
  36. AutoCommit string
  37. StrictCommit string
  38. Result string
  39. ReturnSchema string
  40. ReturnCursorID string
  41. LobRet string
  42. ReturnRowid string
  43. Version string
  44. }
  45. func (xgConn *xuguConn) Begin() (driver.Tx, error) {
  46. return &xuguTx{tconn: xgConn}, nil
  47. }
  48. func (xgConn *xuguConn) Close() error {
  49. xgConn.mu.Lock()
  50. defer xgConn.mu.Unlock()
  51. err := xgConn.conn.Close()
  52. if err != nil {
  53. xgConn.mu.Unlock()
  54. return err
  55. }
  56. return nil
  57. }
  58. func (xgConn *xuguConn) Query(sql string,
  59. args []driver.Value) (driver.Rows, error) {
  60. xgConn.mu.Lock()
  61. defer xgConn.mu.Unlock()
  62. // 检测sql语句不是查询则报错
  63. if switchSQLType(sql) != SQL_SELECT {
  64. return nil, errors.New("The executed SQL statement is not a SELECT")
  65. }
  66. // 有传进来的参数
  67. if len(args) != 0 {
  68. values := []xuguValue{}
  69. //判断类型
  70. for _, param := range args {
  71. err := assertParamType(param, &values)
  72. if err != nil {
  73. return nil, err
  74. }
  75. }
  76. //send msg
  77. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  78. return nil, err
  79. }
  80. if err := sockSendExecute(xgConn); err != nil {
  81. return nil, err
  82. }
  83. } else {
  84. //send msg
  85. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  86. return nil, err
  87. }
  88. if err := sockSendExecute(xgConn); err != nil {
  89. return nil, err
  90. }
  91. }
  92. //recv msg
  93. aR, err := xuguSockRecvMsg(xgConn)
  94. if err != nil {
  95. return nil, err
  96. }
  97. switch aR.rt {
  98. case selectResult:
  99. rows := &xuguRows{
  100. rows_conn: xgConn,
  101. results: aR.s,
  102. colIdx: 0,
  103. prepared: false,
  104. }
  105. return rows, nil
  106. case errInfo:
  107. return nil, errors.New(string(aR.e.ErrStr))
  108. case warnInfo:
  109. return nil, errors.New(string(aR.w.WarnStr))
  110. default:
  111. }
  112. return nil, errors.New("xugu Query error")
  113. }
  114. func (xgConn *xuguConn) Ping(ctx context.Context) error {
  115. //send
  116. xgConn.mu.Lock()
  117. defer xgConn.mu.Unlock()
  118. sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
  119. sockSendExecute(xgConn)
  120. _, err := xuguSockRecvMsg(xgConn)
  121. if err != nil {
  122. return err
  123. }
  124. xgConn.readBuff.reset()
  125. return nil
  126. }
  127. func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
  128. xgConn.mu.Lock()
  129. defer xgConn.mu.Unlock()
  130. //判断sql类型
  131. switch switchSQLType(sql) {
  132. case SQL_PROCEDURE:
  133. return nil, errors.New("Prepare does not support stored procedures")
  134. case SQL_UNKNOWN:
  135. return nil, errors.New("Unknown SQL statement type")
  136. case SQL_CREATE:
  137. return nil, errors.New("Prepare does not support DDL.")
  138. }
  139. //发送创建prepare
  140. prepareName := "GTONG"
  141. err := xuguPrepare(xgConn, sql, prepareName)
  142. if err != nil {
  143. return nil, err
  144. }
  145. count := assertParamCount(sql)
  146. stmt := &xuguStmt{
  147. stmt_conn: xgConn,
  148. prepared: true,
  149. prename: make([]byte, 128),
  150. curopend: false,
  151. curname: make([]byte, 128),
  152. paramCount: count,
  153. mysql: sql,
  154. }
  155. stmt.prename = []byte(fmt.Sprintf("? %s", xgConn.prepareName))
  156. return stmt, nil
  157. }
  158. func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
  159. prepareName = fmt.Sprintf("%s%d", prepareName, pConn.prepareNo)
  160. sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
  161. pConn.prepareName = prepareName
  162. pConn.prepareNo++
  163. //send msg
  164. sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
  165. sockSendExecute(pConn)
  166. //recv msg
  167. aR, err := xuguSockRecvMsg(pConn)
  168. switch aR.rt {
  169. case errInfo:
  170. return errors.New(string(aR.e.ErrStr))
  171. case warnInfo:
  172. return errors.New(string(aR.w.WarnStr))
  173. default:
  174. }
  175. if err != nil {
  176. return err
  177. }
  178. return nil
  179. }
  180. func xuguUnPrepare(pConn *xuguConn, prepareName string) error {
  181. sqlRet := fmt.Sprintf("DEALLOCATE %s ", prepareName)
  182. //send msg
  183. sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
  184. sockSendExecute(pConn)
  185. //recv msg
  186. aR, err := xuguSockRecvMsg(pConn)
  187. switch aR.rt {
  188. case 'K':
  189. return nil
  190. case errInfo:
  191. return errors.New(string(aR.e.ErrStr))
  192. case warnInfo:
  193. return errors.New(string(aR.w.WarnStr))
  194. default:
  195. }
  196. if err != nil {
  197. return err
  198. }
  199. return nil
  200. }
  201. func (xgConn *xuguConn) Exec(sql string,
  202. args []driver.Value) (driver.Result, error) {
  203. xgConn.mu.Lock()
  204. defer xgConn.mu.Unlock()
  205. // 检测sql语句是查询则报错
  206. if switchSQLType(sql) == SQL_SELECT {
  207. return nil, errors.New("The executed SQL statement is a SELECT")
  208. }
  209. // 有传进来的参数
  210. if len(args) != 0 {
  211. values := []xuguValue{}
  212. //判断类型
  213. for _, param := range args {
  214. err := assertParamType(param, &values)
  215. if err != nil {
  216. return nil, err
  217. }
  218. }
  219. //send msg
  220. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  221. return nil, err
  222. }
  223. if err := sockSendExecute(xgConn); err != nil {
  224. return nil, err
  225. }
  226. } else {
  227. //send msg
  228. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  229. return nil, err
  230. }
  231. if err := sockSendExecute(xgConn); err != nil {
  232. return nil, err
  233. }
  234. }
  235. //recv msg
  236. aR, err := xuguSockRecvMsg(xgConn)
  237. if err != nil {
  238. return nil, err
  239. }
  240. switch aR.rt {
  241. case selectResult:
  242. return nil, errors.New("exec is Query error")
  243. case errInfo:
  244. return nil, errors.New(string(aR.e.ErrStr))
  245. case warnInfo:
  246. return nil, errors.New(string(aR.w.WarnStr))
  247. case updateResult:
  248. return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
  249. case insertResult:
  250. return &xuguResult{affectedRows: int64(0), insertId: int64(aR.i.RowidLen)}, nil
  251. default:
  252. return &xuguResult{
  253. affectedRows: int64(0),
  254. insertId: int64(0),
  255. }, nil
  256. }
  257. }
  258. func (xgConn *xuguConn) exec(sql string, args []driver.Value) (driver.Result, error) {
  259. // 有传进来的参数
  260. if len(args) != 0 {
  261. values := []xuguValue{}
  262. //判断类型
  263. for _, param := range args {
  264. err := assertParamType(param, &values)
  265. if err != nil {
  266. return nil, err
  267. }
  268. }
  269. //send msg
  270. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  271. return nil, err
  272. }
  273. if err := sockSendExecute(xgConn); err != nil {
  274. return nil, err
  275. }
  276. } else {
  277. //send msg
  278. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  279. return nil, err
  280. }
  281. if err := sockSendExecute(xgConn); err != nil {
  282. return nil, err
  283. }
  284. }
  285. //recv msg
  286. aR, err := xuguSockRecvMsg(xgConn)
  287. if err != nil {
  288. return nil, err
  289. }
  290. switch aR.rt {
  291. case selectResult:
  292. return nil, errors.New("exec is Query error")
  293. case updateResult:
  294. return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
  295. case insertResult:
  296. return &xuguResult{affectedRows: int64(0), insertId: int64(binary.LittleEndian.Uint64(aR.i.RowidData))}, nil
  297. case errInfo:
  298. return nil, errors.New(string(aR.e.ErrStr))
  299. case warnInfo:
  300. return nil, errors.New(string(aR.w.WarnStr))
  301. default:
  302. return &xuguResult{
  303. affectedRows: int64(0),
  304. insertId: int64(0),
  305. }, nil
  306. }
  307. }