xugu_conn.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package xugu
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql/driver"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "sync"
  10. )
  11. type xuguConn struct {
  12. dsnConfig
  13. conn net.Conn
  14. mu sync.Mutex
  15. useSSL bool // 是否使用加密
  16. havePrepare int //default 0
  17. prepareNo int
  18. prepareName string
  19. //presPrepareCata *Result
  20. errStr []byte
  21. sendBuff bytes.Buffer
  22. readBuff buffer
  23. }
  24. type dsnConfig struct {
  25. IP string
  26. Port string
  27. Database string
  28. User string
  29. Password string
  30. Encryptor string //加密库的解密口令
  31. CharSet string //客户端使用的字符集名
  32. TimeZone string
  33. IsoLevel string //事务隔离级别
  34. LockTimeout string //加锁超时
  35. AutoCommit string
  36. StrictCommit string
  37. Result string
  38. ReturnSchema string
  39. ReturnCursorID string
  40. LobRet string
  41. ReturnRowid string
  42. Version string
  43. }
  44. func (xgConn *xuguConn) Begin() (driver.Tx, error) {
  45. gt("xuguConn.Begin")
  46. defer gt("xuguConn.Begin end")
  47. return nil, nil
  48. }
  49. func (xgConn *xuguConn) Close() error {
  50. gt("xuguConn.Close")
  51. defer gt("xuguConn.Close end")
  52. xgConn.mu.Lock()
  53. defer xgConn.mu.Unlock()
  54. err := xgConn.conn.Close()
  55. if err != nil {
  56. fmt.Println("Close connection error")
  57. xgConn.mu.Unlock()
  58. return err
  59. }
  60. return nil
  61. }
  62. func (xgConn *xuguConn) Query(sql string,
  63. args []driver.Value) (driver.Rows, error) {
  64. gt("xuguConn.Query")
  65. defer gt("xuguConn.Query end")
  66. xgConn.mu.Lock()
  67. defer xgConn.mu.Unlock()
  68. // 检测sql语句不是查询则报错
  69. if switchSQLType(sql) != SQL_SELECT {
  70. return nil, errors.New("The executed SQL statement is not a SELECT")
  71. }
  72. // 有传进来的参数
  73. if len(args) != 0 {
  74. values := []xuguValue{}
  75. //判断类型
  76. for _, param := range args {
  77. err := assertParamType(param, &values)
  78. if err != nil {
  79. return nil, err
  80. }
  81. }
  82. //send msg
  83. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  84. return nil, err
  85. }
  86. if err := sockSendExecute(xgConn); err != nil {
  87. return nil, err
  88. }
  89. } else {
  90. //send msg
  91. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  92. return nil, err
  93. }
  94. if err := sockSendExecute(xgConn); err != nil {
  95. return nil, err
  96. }
  97. }
  98. //recv msg
  99. aR, err := xuguSockRecvMsg(xgConn)
  100. if err != nil {
  101. return nil, err
  102. }
  103. switch aR.rt {
  104. case selectResult:
  105. fmt.Printf("selectResult %#v\n", aR.s)
  106. rows := &xuguRows{
  107. rows_conn: xgConn,
  108. results: aR.s,
  109. colIdx: 0,
  110. prepared: false,
  111. }
  112. return rows, nil
  113. case errInfo:
  114. return nil, errors.New(string(aR.e.ErrStr))
  115. case warnInfo:
  116. return nil, errors.New(string(aR.w.WarnStr))
  117. default:
  118. }
  119. return nil, errors.New("xugu Query error")
  120. }
  121. func (xgConn *xuguConn) Ping(ctx context.Context) error {
  122. gt("xuguConn.Ping")
  123. defer gt("xuguConn.Ping end")
  124. //send
  125. xgConn.mu.Lock()
  126. defer xgConn.mu.Unlock()
  127. sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
  128. sockSendExecute(xgConn)
  129. _, err := xuguSockRecvMsg(xgConn)
  130. if err != nil {
  131. fmt.Println("Ping err : ", err.Error())
  132. return err
  133. }
  134. xgConn.readBuff.reset()
  135. return nil
  136. }
  137. func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
  138. gt("xuguConn.Prepare")
  139. defer gt("xuguConn.Prepare end")
  140. xgConn.mu.Lock()
  141. defer xgConn.mu.Unlock()
  142. //判断sql类型
  143. switch switchSQLType(sql) {
  144. case SQL_PROCEDURE:
  145. return nil, errors.New("Prepare does not support stored procedures")
  146. case SQL_UNKNOWN:
  147. return nil, errors.New("Unknown SQL statement type")
  148. case SQL_CREATE:
  149. return nil, errors.New("Prepare does not support DDL.")
  150. }
  151. //发送创建prepare
  152. prepareName := "GTONG"
  153. err := xuguPrepare(xgConn, sql, prepareName)
  154. if err != nil {
  155. return nil, err
  156. }
  157. count := assertParamCount(sql)
  158. stmt := &xuguStmt{
  159. stmt_conn: xgConn,
  160. prepared: true,
  161. prename: make([]byte, 128),
  162. curopend: false,
  163. curname: make([]byte, 128),
  164. paramCount: count,
  165. mysql: sql,
  166. }
  167. stmt.prename = []byte(fmt.Sprintf("? %s", xgConn.prepareName))
  168. fmt.Println("stmt.prename ", stmt.prename, string(stmt.prename))
  169. return stmt, nil
  170. }
  171. func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
  172. fmt.Println("\n ---xuguPrepare")
  173. prepareName = fmt.Sprintf("%s%d", prepareName, pConn.prepareNo)
  174. sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
  175. pConn.prepareName = prepareName
  176. fmt.Println("pConn.prepareName ", pConn.prepareName)
  177. pConn.prepareNo++
  178. //send msg
  179. sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
  180. sockSendExecute(pConn)
  181. //recv msg
  182. _, err := xuguSockRecvMsg(pConn)
  183. if err != nil {
  184. fmt.Println("xuguPrepare parseMsg(&pConn.readBuff, pConn)")
  185. return err
  186. }
  187. fmt.Println("Message from server:", pConn.readBuff.buf[pConn.readBuff.idx:])
  188. fmt.Println("Message from server:", string(pConn.readBuff.buf[pConn.readBuff.idx:]))
  189. fmt.Println("\n ---xuguPrepare end")
  190. return nil
  191. }
  192. func (xgConn *xuguConn) Exec(sql string,
  193. args []driver.Value) (driver.Result, error) {
  194. gt("xuguConn.Exec")
  195. defer gt("xuguConn.Exec end")
  196. xgConn.mu.Lock()
  197. defer xgConn.mu.Unlock()
  198. // 检测sql语句是查询则报错
  199. if switchSQLType(sql) == SQL_SELECT {
  200. return nil, errors.New("The executed SQL statement is a SELECT")
  201. }
  202. // 有传进来的参数
  203. if len(args) != 0 {
  204. values := []xuguValue{}
  205. //判断类型
  206. for _, param := range args {
  207. err := assertParamType(param, &values)
  208. if err != nil {
  209. return nil, err
  210. }
  211. }
  212. //send msg
  213. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  214. return nil, err
  215. }
  216. if err := sockSendExecute(xgConn); err != nil {
  217. return nil, err
  218. }
  219. } else {
  220. //send msg
  221. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  222. return nil, err
  223. }
  224. if err := sockSendExecute(xgConn); err != nil {
  225. return nil, err
  226. }
  227. }
  228. //recv msg
  229. aR, err := xuguSockRecvMsg(xgConn)
  230. if err != nil {
  231. return nil, err
  232. }
  233. switch aR.rt {
  234. case selectResult:
  235. return nil, errors.New("exec is Query error")
  236. case errInfo:
  237. return nil, errors.New(string(aR.e.ErrStr))
  238. case warnInfo:
  239. return nil, errors.New(string(aR.w.WarnStr))
  240. default:
  241. return &xuguResult{
  242. affectedRows: int64(0),
  243. insertId: int64(0),
  244. }, nil
  245. }
  246. }
  247. func (xgConn *xuguConn) exec(sql string, args []driver.Value) (driver.Result, error) {
  248. // 有传进来的参数
  249. if len(args) != 0 {
  250. values := []xuguValue{}
  251. //判断类型
  252. for _, param := range args {
  253. err := assertParamType(param, &values)
  254. if err != nil {
  255. return nil, err
  256. }
  257. }
  258. //send msg
  259. if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
  260. return nil, err
  261. }
  262. if err := sockSendExecute(xgConn); err != nil {
  263. return nil, err
  264. }
  265. } else {
  266. //send msg
  267. if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
  268. return nil, err
  269. }
  270. if err := sockSendExecute(xgConn); err != nil {
  271. return nil, err
  272. }
  273. }
  274. //recv msg
  275. aR, err := xuguSockRecvMsg(xgConn)
  276. if err != nil {
  277. return nil, err
  278. }
  279. switch aR.rt {
  280. case selectResult:
  281. return nil, errors.New("exec is Query error")
  282. case errInfo:
  283. return nil, errors.New(string(aR.e.ErrStr))
  284. case warnInfo:
  285. return nil, errors.New(string(aR.w.WarnStr))
  286. default:
  287. return &xuguResult{
  288. affectedRows: int64(0),
  289. insertId: int64(0),
  290. }, nil
  291. }
  292. }