xugu_conn.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. package xugu
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql/driver"
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "sync"
  11. )
  12. type xuguConn struct {
  13. dsnConfig
  14. conn net.Conn
  15. mu sync.Mutex
  16. useSSL bool // 是否使用加密
  17. havePrepare int //default 0
  18. prepareNo int
  19. prepareName string
  20. //presPrepareCata *Result
  21. errStr []byte
  22. sendBuff bytes.Buffer
  23. readBuff buffer
  24. }
  25. type dsnConfig struct {
  26. IP string
  27. Port string
  28. Database string
  29. User string
  30. Password string
  31. Encryptor string //加密库的解密口令
  32. CharSet string //客户端使用的字符集名
  33. TimeZone string
  34. IsoLevel string //事务隔离级别
  35. LockTimeout string //加锁超时
  36. AutoCommit string
  37. StrictCommit string
  38. Result string
  39. ReturnSchema string
  40. ReturnCursorID string
  41. LobRet string
  42. ReturnRowid string
  43. Version string
  44. }
  45. func (xgConn *xuguConn) Begin() (driver.Tx, error) {
  46. gt("xuguConn.Begin")
  47. defer gt("xuguConn.Begin end")
  48. return &xuguTx{tconn: xgConn}, nil
  49. }
  50. func (xgConn *xuguConn) Close() error {
  51. gt("xuguConn.Close")
  52. defer gt("xuguConn.Close end")
  53. xgConn.mu.Lock()
  54. defer xgConn.mu.Unlock()
  55. err := xgConn.conn.Close()
  56. if err != nil {
  57. fmt.Println("Close connection error")
  58. xgConn.mu.Unlock()
  59. return err
  60. }
  61. return nil
  62. }
  63. func (xgConn *xuguConn) Query(sql string,
  64. args []driver.Value) (driver.Rows, error) {
  65. gt("xuguConn.Query")
  66. defer gt("xuguConn.Query end")
  67. xgConn.mu.Lock()
  68. defer xgConn.mu.Unlock()
  69. // 检测sql语句不是查询则报错
  70. if switchSQLType(sql) != SQL_SELECT {
  71. return nil, errors.New("The executed SQL statement is not a SELECT")
  72. }
  73. // 有传进来的参数
  74. if len(args) != 0 {
  75. values := []xuguValue{}
  76. //判断类型
  77. for _, param := range args {
  78. err := assertParamType(param, &values)
  79. if err != nil {
  80. return nil, err
  81. }
  82. }
  83. //send msg
  84. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  85. return nil, err
  86. }
  87. if err := sockSendExecute(xgConn); err != nil {
  88. return nil, err
  89. }
  90. } else {
  91. //send msg
  92. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  93. return nil, err
  94. }
  95. if err := sockSendExecute(xgConn); err != nil {
  96. return nil, err
  97. }
  98. }
  99. //recv msg
  100. aR, err := xuguSockRecvMsg(xgConn)
  101. if err != nil {
  102. return nil, err
  103. }
  104. switch aR.rt {
  105. case selectResult:
  106. rows := &xuguRows{
  107. rows_conn: xgConn,
  108. results: aR.s,
  109. colIdx: 0,
  110. prepared: false,
  111. }
  112. return rows, nil
  113. case errInfo:
  114. return nil, errors.New(string(aR.e.ErrStr))
  115. case warnInfo:
  116. return nil, errors.New(string(aR.w.WarnStr))
  117. default:
  118. }
  119. return nil, errors.New("xugu Query error")
  120. }
  121. func (xgConn *xuguConn) Ping(ctx context.Context) error {
  122. gt("xuguConn.Ping")
  123. defer gt("xuguConn.Ping end")
  124. //send
  125. xgConn.mu.Lock()
  126. defer xgConn.mu.Unlock()
  127. sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
  128. sockSendExecute(xgConn)
  129. _, err := xuguSockRecvMsg(xgConn)
  130. if err != nil {
  131. fmt.Println("Ping err : ", err.Error())
  132. return err
  133. }
  134. xgConn.readBuff.reset()
  135. return nil
  136. }
  137. func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
  138. gt("xuguConn.Prepare")
  139. defer gt("xuguConn.Prepare end")
  140. xgConn.mu.Lock()
  141. defer xgConn.mu.Unlock()
  142. //判断sql类型
  143. switch switchSQLType(sql) {
  144. case SQL_PROCEDURE:
  145. return nil, errors.New("Prepare does not support stored procedures")
  146. case SQL_UNKNOWN:
  147. return nil, errors.New("Unknown SQL statement type")
  148. case SQL_CREATE:
  149. return nil, errors.New("Prepare does not support DDL.")
  150. }
  151. //发送创建prepare
  152. prepareName := "GTONG"
  153. err := xuguPrepare(xgConn, sql, prepareName)
  154. if err != nil {
  155. return nil, err
  156. }
  157. count := assertParamCount(sql)
  158. stmt := &xuguStmt{
  159. stmt_conn: xgConn,
  160. prepared: true,
  161. prename: make([]byte, 128),
  162. curopend: false,
  163. curname: make([]byte, 128),
  164. paramCount: count,
  165. mysql: sql,
  166. }
  167. stmt.prename = []byte(fmt.Sprintf("? %s", xgConn.prepareName))
  168. return stmt, nil
  169. }
  170. func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
  171. fmt.Println("\n ---xuguPrepare")
  172. prepareName = fmt.Sprintf("%s%d", prepareName, pConn.prepareNo)
  173. sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
  174. pConn.prepareName = prepareName
  175. pConn.prepareNo++
  176. //send msg
  177. sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
  178. sockSendExecute(pConn)
  179. //recv msg
  180. _, err := xuguSockRecvMsg(pConn)
  181. if err != nil {
  182. fmt.Println("xuguPrepare parseMsg(&pConn.readBuff, pConn)")
  183. return err
  184. }
  185. fmt.Println("\n ---xuguPrepare end")
  186. return nil
  187. }
  188. func (xgConn *xuguConn) Exec(sql string,
  189. args []driver.Value) (driver.Result, error) {
  190. gt("xuguConn.Exec")
  191. defer gt("xuguConn.Exec end")
  192. xgConn.mu.Lock()
  193. defer xgConn.mu.Unlock()
  194. // 检测sql语句是查询则报错
  195. if switchSQLType(sql) == SQL_SELECT {
  196. return nil, errors.New("The executed SQL statement is a SELECT")
  197. }
  198. // 有传进来的参数
  199. if len(args) != 0 {
  200. values := []xuguValue{}
  201. //判断类型
  202. for _, param := range args {
  203. err := assertParamType(param, &values)
  204. if err != nil {
  205. return nil, err
  206. }
  207. }
  208. //send msg
  209. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  210. return nil, err
  211. }
  212. if err := sockSendExecute(xgConn); err != nil {
  213. return nil, err
  214. }
  215. } else {
  216. //send msg
  217. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  218. return nil, err
  219. }
  220. if err := sockSendExecute(xgConn); err != nil {
  221. return nil, err
  222. }
  223. }
  224. //recv msg
  225. aR, err := xuguSockRecvMsg(xgConn)
  226. if err != nil {
  227. return nil, err
  228. }
  229. switch aR.rt {
  230. case selectResult:
  231. return nil, errors.New("exec is Query error")
  232. case errInfo:
  233. return nil, errors.New(string(aR.e.ErrStr))
  234. case warnInfo:
  235. return nil, errors.New(string(aR.w.WarnStr))
  236. case updateResult:
  237. return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
  238. case insertResult:
  239. return &xuguResult{affectedRows: int64(0), insertId: int64(aR.i.RowidLen)}, nil
  240. default:
  241. return &xuguResult{
  242. affectedRows: int64(0),
  243. insertId: int64(0),
  244. }, nil
  245. }
  246. }
  247. func (xgConn *xuguConn) exec(sql string, args []driver.Value) (driver.Result, error) {
  248. // 有传进来的参数
  249. if len(args) != 0 {
  250. values := []xuguValue{}
  251. //判断类型
  252. for _, param := range args {
  253. err := assertParamType(param, &values)
  254. if err != nil {
  255. return nil, err
  256. }
  257. }
  258. //send msg
  259. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  260. return nil, err
  261. }
  262. if err := sockSendExecute(xgConn); err != nil {
  263. return nil, err
  264. }
  265. } else {
  266. //send msg
  267. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  268. return nil, err
  269. }
  270. if err := sockSendExecute(xgConn); err != nil {
  271. return nil, err
  272. }
  273. }
  274. //recv msg
  275. aR, err := xuguSockRecvMsg(xgConn)
  276. if err != nil {
  277. return nil, err
  278. }
  279. switch aR.rt {
  280. case selectResult:
  281. return nil, errors.New("exec is Query error")
  282. case updateResult:
  283. return &xuguResult{affectedRows: int64(aR.u.UpdateNum), insertId: int64(0)}, nil
  284. case insertResult:
  285. return &xuguResult{affectedRows: int64(0), insertId: int64(binary.LittleEndian.Uint64(aR.i.RowidData))}, nil
  286. case errInfo:
  287. return nil, errors.New(string(aR.e.ErrStr))
  288. case warnInfo:
  289. return nil, errors.New(string(aR.w.WarnStr))
  290. default:
  291. return &xuguResult{
  292. affectedRows: int64(0),
  293. insertId: int64(0),
  294. }, nil
  295. }
  296. }