xugu_conn.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. package xugu
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql/driver"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "sync"
  10. )
  11. type xuguConn struct {
  12. dsnConfig
  13. conn net.Conn
  14. mu sync.Mutex
  15. Type HANDLE_TYPE // 句柄类型
  16. useSSL bool // 是否使用加密
  17. havePrepare int //default 0
  18. bkChar byte
  19. prepareNo int //fashengqi
  20. prepareName string
  21. //presPrepareCata *Result
  22. errStr []byte
  23. sendBuff bytes.Buffer
  24. readBuff buffer
  25. }
  26. type dsnConfig struct {
  27. IP string
  28. Port string
  29. Database string
  30. User string
  31. Password string
  32. Encryptor string //加密库的解密口令
  33. CharSet string //客户端使用的字符集名
  34. TimeZone string
  35. IsoLevel string //事务隔离级别
  36. LockTimeout string //加锁超时
  37. AutoCommit string
  38. StrictCommit string
  39. Result string
  40. ReturnSchema string
  41. ReturnCursorID string
  42. LobRet string
  43. ReturnRowid string
  44. Version string
  45. }
  46. func (xgConn *xuguConn) Get_error() error {
  47. return nil
  48. }
  49. //SELECT 1 FROM DUAL
  50. func (xgConn *xuguConn) Begin() (driver.Tx, error) {
  51. _, err := xgConn.exec("set auto_commit off;")
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &xuguTx{tconn: xgConn}, nil
  56. }
  57. var prepareCount = 0
  58. func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
  59. xgConn.mu.Lock()
  60. fmt.Println(">>>>>(xgConn *xuguConn) Prepare(query string)")
  61. prepareName := fmt.Sprintf("GTONG%d", prepareCount)
  62. prepareCount++
  63. //判断sql类型
  64. switch switchSQLType(sql) {
  65. case SQL_PROCEDURE:
  66. return nil, errors.New("Prepare does not support stored procedures")
  67. case SQL_UNKNOWN:
  68. return nil, errors.New("Unknown SQL statement type")
  69. case SQL_CREATE:
  70. return nil, errors.New("Prepare does not support DDL.")
  71. }
  72. //参数个数
  73. parser := &xuguParse{
  74. bind_type: 0,
  75. param_count: 0,
  76. }
  77. count := parser.assertParamCount(sql)
  78. parser.param_count = count
  79. fmt.Println("语句 参数个数:", count)
  80. //发送创建prepare
  81. err := xuguPrepare(xgConn, sql, prepareName)
  82. if err != nil {
  83. return nil, err
  84. }
  85. //sockSendPutStatement(xgConn, []byte(sql))
  86. stmt := &xuguStmt{
  87. stmt_conn: xgConn,
  88. prepared: true,
  89. prename: make([]byte, 128),
  90. curopend: false,
  91. curname: make([]byte, 128),
  92. paramCount: count,
  93. mysql: sql,
  94. parser: parser,
  95. }
  96. prepareName = fmt.Sprintf("? %s ", prepareName)
  97. stmt.prename = []byte(prepareName)
  98. xgConn.prepareName = prepareName
  99. xgConn.mu.Unlock()
  100. return stmt, nil
  101. }
  102. func (xgConn *xuguConn) Close() error {
  103. fmt.Println("Close connection")
  104. err := xgConn.conn.Close()
  105. if err != nil {
  106. fmt.Println("Close connection error")
  107. return err
  108. }
  109. return nil
  110. }
  111. func (xgConn *xuguConn) Exec(sql string,
  112. args []driver.Value) (driver.Result, error) {
  113. fmt.Printf(">>> (xgConn *xuguConn) Exec: %s, %v\n", sql, args)
  114. if switchSQLType(sql) == SQL_SELECT {
  115. return nil, errors.New("The executed SQL statement is not a SELECT")
  116. }
  117. //有参数传进来
  118. if len(args) != 0 {
  119. //创建prepare
  120. _, err := xgConn.Prepare(sql)
  121. if err != nil {
  122. return nil, err
  123. }
  124. //
  125. parser := &xuguParse{
  126. bind_type: 0,
  127. param_count: len(args),
  128. }
  129. for pos, param := range args {
  130. parser.assertParamType(param, pos)
  131. //fmt.Printf("Field Name: %s, Field Type: %s, Field Value: %v\n", v1, v1, v1)
  132. }
  133. sockSendPutStatement(xgConn, []byte(xgConn.prepareName), &parser.values, len(args))
  134. XGC_Execute(xgConn)
  135. //处理服务器返回
  136. xgConn.conn.Read(xgConn.readBuff.buf)
  137. readBuf := &xgConn.readBuff
  138. fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:])
  139. fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf))
  140. aR, err := parseMsg(readBuf, xgConn)
  141. if err != nil {
  142. return nil, err
  143. }
  144. switch aR.rt {
  145. case insertResult:
  146. return &xuguResult{
  147. affectedRows: int64(0),
  148. insertId: int64(0),
  149. }, nil
  150. case errInfo:
  151. return nil, errors.New(string(aR.e.ErrStr))
  152. case warnInfo:
  153. return nil, errors.New(string(aR.w.WarnStr))
  154. default:
  155. return &xuguResult{
  156. affectedRows: int64(0),
  157. insertId: int64(0),
  158. }, nil
  159. }
  160. }
  161. rs, err := xgConn.exec(sql)
  162. if err != nil {
  163. return nil, err
  164. }
  165. return rs, nil
  166. }
  167. func (xgConn *xuguConn) exec(sql string) (driver.Result, error) {
  168. sockSendPutStatement(xgConn, []byte(sql), nil, 0)
  169. XGC_Execute(xgConn)
  170. //处理服务器返回
  171. xgConn.conn.Read(xgConn.readBuff.buf)
  172. readBuf := &xgConn.readBuff
  173. fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:])
  174. fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf))
  175. aR, err := parseMsg(readBuf, xgConn)
  176. if err != nil {
  177. return nil, err
  178. }
  179. switch aR.rt {
  180. case insertResult:
  181. return &xuguResult{
  182. affectedRows: int64(0),
  183. insertId: int64(0),
  184. }, nil
  185. case errInfo:
  186. return nil, errors.New(string(aR.e.ErrStr))
  187. case warnInfo:
  188. return nil, errors.New(string(aR.w.WarnStr))
  189. default:
  190. return &xuguResult{
  191. affectedRows: int64(0),
  192. insertId: int64(0),
  193. }, nil
  194. }
  195. }
  196. func (xgConn *xuguConn) Query(sql string,
  197. args []driver.Value) (driver.Rows, error) {
  198. fmt.Println(">>> (xgConn *xuguConn) Query")
  199. if switchSQLType(sql) != SQL_SELECT {
  200. return nil, errors.New("The executed SQL statement is not a SELECT")
  201. }
  202. if len(args) != 0 {
  203. parser := &xuguParse{
  204. bind_type: 0,
  205. param_count: len(args),
  206. }
  207. for pos, param := range args {
  208. err := parser.assertParamType(param, pos)
  209. if err != nil {
  210. return nil, err
  211. }
  212. }
  213. sockSendPutStatement(xgConn, []byte(xgConn.prepareName), &parser.values, len(args))
  214. XGC_Execute(xgConn)
  215. //处理服务器返回
  216. xgConn.conn.Read(xgConn.readBuff.buf)
  217. readBuf := &xgConn.readBuff
  218. fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:])
  219. fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf))
  220. aR, err := parseMsg(readBuf, xgConn)
  221. if err != nil {
  222. return nil, err
  223. }
  224. switch aR.rt {
  225. case selectResult:
  226. rows := &xuguRows{
  227. rows_conn: xgConn,
  228. results: aR.s,
  229. lastRowRelt: int(0),
  230. lastRelt: int(0),
  231. prepared: false,
  232. }
  233. return rows, nil
  234. case errInfo:
  235. return nil, errors.New(string(aR.e.ErrStr))
  236. case warnInfo:
  237. return nil, errors.New(string(aR.w.WarnStr))
  238. default:
  239. }
  240. }
  241. //最终发送消息给服务器
  242. sockSendPutStatement(xgConn, []byte(sql), nil, 0)
  243. XGC_Execute(xgConn)
  244. //----------------处理服务器查询返回
  245. xgConn.conn.Read(xgConn.readBuff.buf)
  246. //fmt.Println("Message from server:", (buffer[:n]))
  247. fmt.Println("Message from server:", string(xgConn.readBuff.buf))
  248. //fmt.Println("\nMessage from server2:", (stmt.stmt_conn.readBuff.buf))
  249. results, err := parseSelectResult(&xgConn.readBuff)
  250. if err != nil {
  251. fmt.Println("parseSelectResult parseSelectResult err", err.Error())
  252. return nil, err
  253. }
  254. rows := &xuguRows{
  255. rows_conn: xgConn,
  256. results: results,
  257. lastRowRelt: int(0),
  258. lastRelt: int(0),
  259. prepared: false,
  260. }
  261. return rows, nil
  262. }
  263. func (xgConn *xuguConn) Ping(ctx context.Context) error {
  264. sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
  265. XGC_Execute(xgConn)
  266. xgConn.conn.Read(xgConn.readBuff.buf)
  267. //fmt.Println("Message from server:", (buffer[:n]))
  268. fmt.Println("Message from server:", string(xgConn.readBuff.buf))
  269. //fmt.Println("\nMessage from server2:", (stmt.stmt_conn.readBuff.buf))
  270. _, err := parseSelectResult(&xgConn.readBuff)
  271. if err != nil {
  272. fmt.Println("parseSelectResult parseSelectResult err", err.Error())
  273. return err
  274. }
  275. xgConn.readBuff.idx = 0
  276. xgConn.readBuff.buf = make([]byte, 2048)
  277. return nil
  278. }
  279. func (pConn *xuguConn) ExecContext(ctx context.Context,
  280. query string, args []driver.NamedValue) (driver.Result, error) {
  281. Value, err := namedValueToValue(args)
  282. if err != nil {
  283. return nil, err
  284. }
  285. return pConn.Exec(query, Value)
  286. }
  287. func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
  288. __Par := make([]driver.Value, len(named))
  289. for pos, Param := range named {
  290. if len(Param.Name) > 0 {
  291. return nil, errors.New("Driver does not support the use of Named Parameters")
  292. }
  293. __Par[pos] = Param.Value
  294. }
  295. return __Par, nil
  296. }