xugu_conn.go 8.2 KB


  1. package xugu
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql/driver"
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "sync"
  11. )
  12. type xuguConn struct {
  13. dsnConfig
  14. conn net.Conn
  15. mu sync.Mutex
  16. useSSL bool // 是否使用加密
  17. havePrepare int //default 0
  18. prepareNo int
  19. prepareName string
  20. //presPrepareCata *Result
  21. errStr []byte
  22. sendBuff bytes.Buffer
  23. readBuff buffer
  24. }
  25. type dsnConfig struct {
  26. IP string
  27. Port string
  28. Database string
  29. User string
  30. Password string
  31. Encryptor string //加密库的解密口令
  32. CharSet string //客户端使用的字符集名
  33. TimeZone string
  34. IsoLevel string //事务隔离级别
  35. LockTimeout string //加锁超时
  36. AutoCommit string
  37. StrictCommit string
  38. Result string
  39. ReturnSchema string
  40. ReturnCursorID string
  41. LobRet string
  42. ReturnRowid string
  43. Version string
  44. }
  45. func (xgConn *xuguConn) Begin() (driver.Tx, error) {
  46. gt("xuguConn.Begin")
  47. defer gt("xuguConn.Begin end")
  48. return &xuguTx{tconn: xgConn}, nil
  49. }
  50. func (xgConn *xuguConn) Close() error {
  51. gt("xuguConn.Close")
  52. defer gt("xuguConn.Close end")
  53. xgConn.mu.Lock()
  54. defer xgConn.mu.Unlock()
  55. err := xgConn.conn.Close()
  56. if err != nil {
  57. fmt.Println("Close connection error")
  58. xgConn.mu.Unlock()
  59. return err
  60. }
  61. return nil
  62. }
  63. func (xgConn *xuguConn) Query(sql string,
  64. args []driver.Value) (driver.Rows, error) {
  65. gt("xuguConn.Query")
  66. defer gt("xuguConn.Query end")
  67. xgConn.mu.Lock()
  68. defer xgConn.mu.Unlock()
  69. // 检测sql语句不是查询则报错
  70. if switchSQLType(sql) != SQL_SELECT {
  71. return nil, errors.New("The executed SQL statement is not a SELECT")
  72. }
  73. // 有传进来的参数
  74. if len(args) != 0 {
  75. values := []xuguValue{}
  76. //判断类型
  77. for _, param := range args {
  78. err := assertParamType(param, &values)
  79. if err != nil {
  80. return nil, err
  81. }
  82. }
  83. //send msg
  84. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  85. return nil, err
  86. }
  87. if err := sockSendExecute(xgConn); err != nil {
  88. return nil, err
  89. }
  90. } else {
  91. //send msg
  92. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  93. return nil, err
  94. }
  95. if err := sockSendExecute(xgConn); err != nil {
  96. return nil, err
  97. }
  98. }
  99. //recv msg
  100. aR, err := xuguSockRecvMsg(xgConn)
  101. if err != nil {
  102. return nil, err
  103. }
  104. switch aR.rt {
  105. case selectResult:
  106. rows := &xuguRows{
  107. rows_conn: xgConn,
  108. results: aR.s,
  109. colIdx: 0,
  110. prepared: false,
  111. }
  112. return rows, nil
  113. case errInfo:
  114. return nil, errors.New(string(aR.e.ErrStr))
  115. case warnInfo:
  116. return nil, errors.New(string(aR.w.WarnStr))
  117. default:
  118. }
  119. return nil, errors.New("xugu Query error")
  120. }
  121. func (xgConn *xuguConn) Ping(ctx context.Context) error {
  122. gt("xuguConn.Ping")
  123. defer gt("xuguConn.Ping end")
  124. //send
  125. xgConn.mu.Lock()
  126. defer xgConn.mu.Unlock()
  127. sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
  128. sockSendExecute(xgConn)
  129. _, err := xuguSockRecvMsg(xgConn)
  130. if err != nil {
  131. fmt.Println("Ping err : ", err.Error())
  132. return err
  133. }
  134. xgConn.readBuff.reset()
  135. return nil
  136. }
  137. func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
  138. gt("xuguConn.Prepare")
  139. defer gt("xuguConn.Prepare end")
  140. xgConn.mu.Lock()
  141. defer xgConn.mu.Unlock()
  142. //判断sql类型
  143. switch switchSQLType(sql) {
  144. case SQL_PROCEDURE:
  145. return nil, errors.New("Prepare does not support stored procedures")
  146. case SQL_UNKNOWN:
  147. return nil, errors.New("Unknown SQL statement type")
  148. case SQL_CREATE:
  149. return nil, errors.New("Prepare does not support DDL.")
  150. }
  151. //发送创建prepare
  152. prepareName := "GTONG"
  153. err := xuguPrepare(xgConn, sql, prepareName)
  154. fmt.Println("xuguPrepare err : ", err)
  155. if err != nil {
  156. return nil, err
  157. }
  158. count := assertParamCount(sql)
  159. stmt := &xuguStmt{
  160. stmt_conn: xgConn,
  161. prepared: true,
  162. prename: make([]byte, 128),
  163. curopend: false,
  164. curname: make([]byte, 128),
  165. paramCount: count,
  166. mysql: sql,
  167. }
  168. stmt.prename = []byte(fmt.Sprintf("? %s", xgConn.prepareName))
  169. return stmt, nil
  170. }
  171. func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
  172. fmt.Println("\n ---xuguPrepare")
  173. prepareName = fmt.Sprintf("%s%d", prepareName, pConn.prepareNo)
  174. sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
  175. pConn.prepareName = prepareName
  176. pConn.prepareNo++
  177. //send msg
  178. sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
  179. sockSendExecute(pConn)
  180. //recv msg
  181. aR, err := xuguSockRecvMsg(pConn)
  182. switch aR.rt {
  183. case errInfo:
  184. return errors.New(string(aR.e.ErrStr))
  185. case warnInfo:
  186. return errors.New(string(aR.w.WarnStr))
  187. default:
  188. }
  189. if err != nil {
  190. fmt.Println("xuguPrepare parseMsg(&pConn.readBuff, pConn)")
  191. return err
  192. }
  193. fmt.Println("\n ---xuguPrepare end")
  194. return nil
  195. }
  196. func xuguUnPrepare(pConn *xuguConn, prepareName string) error {
  197. fmt.Println("\n ---xuguUnPrepare")
  198. sqlRet := fmt.Sprintf("DEALLOCATE %s ", prepareName)
  199. fmt.Println("sqlRet : ", sqlRet)
  200. fmt.Println("prepareName : ", prepareName)
  201. //send msg
  202. sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
  203. sockSendExecute(pConn)
  204. //recv msg
  205. aR, err := xuguSockRecvMsg(pConn)
  206. switch aR.rt {
  207. case 'K':
  208. return nil
  209. case errInfo:
  210. return errors.New(string(aR.e.ErrStr))
  211. case warnInfo:
  212. return errors.New(string(aR.w.WarnStr))
  213. default:
  214. }
  215. if err != nil {
  216. fmt.Println("xuguUnPrepare parseMsg(&pConn.readBuff, pConn)")
  217. return err
  218. }
  219. fmt.Println("\n ---xuguUnPrepare end")
  220. return nil
  221. }
  222. func (xgConn *xuguConn) Exec(sql string,
  223. args []driver.Value) (driver.Result, error) {
  224. gt("xuguConn.Exec")
  225. defer gt("xuguConn.Exec end")
  226. xgConn.mu.Lock()
  227. defer xgConn.mu.Unlock()
  228. // 检测sql语句是查询则报错
  229. if switchSQLType(sql) == SQL_SELECT {
  230. return nil, errors.New("The executed SQL statement is a SELECT")
  231. }
  232. // 有传进来的参数
  233. if len(args) != 0 {
  234. values := []xuguValue{}
  235. //判断类型
  236. for _, param := range args {
  237. err := assertParamType(param, &values)
  238. if err != nil {
  239. return nil, err
  240. }
  241. }
  242. //send msg
  243. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  244. return nil, err
  245. }
  246. if err := sockSendExecute(xgConn); err != nil {
  247. return nil, err
  248. }
  249. } else {
  250. //send msg
  251. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  252. return nil, err
  253. }
  254. if err := sockSendExecute(xgConn); err != nil {
  255. return nil, err
  256. }
  257. }
  258. //recv msg
  259. aR, err := xuguSockRecvMsg(xgConn)
  260. if err != nil {
  261. return nil, err
  262. }
  263. switch aR.rt {
  264. case selectResult:
  265. return nil, errors.New("exec is Query error")
  266. case errInfo:
  267. return nil, errors.New(string(aR.e.ErrStr))
  268. case warnInfo:
  269. return nil, errors.New(string(aR.w.WarnStr))
  270. case updateResult:
  271. return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
  272. case insertResult:
  273. return &xuguResult{affectedRows: int64(0), insertId: int64(aR.i.RowidLen)}, nil
  274. default:
  275. return &xuguResult{
  276. affectedRows: int64(0),
  277. insertId: int64(0),
  278. }, nil
  279. }
  280. }
  281. func (xgConn *xuguConn) exec(sql string, args []driver.Value) (driver.Result, error) {
  282. // 有传进来的参数
  283. if len(args) != 0 {
  284. values := []xuguValue{}
  285. //判断类型
  286. for _, param := range args {
  287. err := assertParamType(param, &values)
  288. if err != nil {
  289. return nil, err
  290. }
  291. }
  292. //send msg
  293. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  294. return nil, err
  295. }
  296. if err := sockSendExecute(xgConn); err != nil {
  297. return nil, err
  298. }
  299. } else {
  300. //send msg
  301. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  302. return nil, err
  303. }
  304. if err := sockSendExecute(xgConn); err != nil {
  305. return nil, err
  306. }
  307. }
  308. //recv msg
  309. aR, err := xuguSockRecvMsg(xgConn)
  310. if err != nil {
  311. return nil, err
  312. }
  313. switch aR.rt {
  314. case selectResult:
  315. return nil, errors.New("exec is Query error")
  316. case updateResult:
  317. return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
  318. case insertResult:
  319. return &xuguResult{affectedRows: int64(0), insertId: int64(binary.LittleEndian.Uint64(aR.i.RowidData))}, nil
  320. case errInfo:
  321. return nil, errors.New(string(aR.e.ErrStr))
  322. case warnInfo:
  323. return nil, errors.New(string(aR.w.WarnStr))
  324. default:
  325. return &xuguResult{
  326. affectedRows: int64(0),
  327. insertId: int64(0),
  328. }, nil
  329. }
  330. }