package xugu import ( "bytes" "context" "database/sql/driver" "errors" "fmt" "net" "sync" ) type xuguConn struct { dsnConfig conn net.Conn mu sync.Mutex Type HANDLE_TYPE // 句柄类型 useSSL bool // 是否使用加密 havePrepare int //default 0 bkChar byte prepareNo int //fashengqi prepareName string //presPrepareCata *Result errStr []byte sendBuff bytes.Buffer readBuff buffer } type dsnConfig struct { IP string Port string Database string User string Password string Encryptor string //加密库的解密口令 CharSet string //客户端使用的字符集名 TimeZone string IsoLevel string //事务隔离级别 LockTimeout string //加锁超时 AutoCommit string StrictCommit string Result string ReturnSchema string ReturnCursorID string LobRet string ReturnRowid string Version string } func (xgConn *xuguConn) Get_error() error { return nil } //SELECT 1 FROM DUAL func (xgConn *xuguConn) Begin() (driver.Tx, error) { _, err := xgConn.exec("set auto_commit off;") if err != nil { return nil, err } return &xuguTx{tconn: xgConn}, nil } var prepareCount = 0 func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) { xgConn.mu.Lock() fmt.Println(">>>>>(xgConn *xuguConn) Prepare(query string)") prepareName := fmt.Sprintf("GTONG%d", prepareCount) prepareCount++ //判断sql类型 switch switchSQLType(sql) { case SQL_PROCEDURE: return nil, errors.New("Prepare does not support stored procedures") case SQL_UNKNOWN: return nil, errors.New("Unknown SQL statement type") case SQL_CREATE: return nil, errors.New("Prepare does not support DDL.") } //参数个数 parser := &xuguParse{ bind_type: 0, param_count: 0, } count := parser.assertParamCount(sql) parser.param_count = count fmt.Println("语句 参数个数:", count) //发送创建prepare err := xuguPrepare(xgConn, sql, prepareName) if err != nil { return nil, err } //sockSendPutStatement(xgConn, []byte(sql)) stmt := &xuguStmt{ stmt_conn: xgConn, prepared: true, prename: make([]byte, 128), curopend: false, curname: make([]byte, 128), paramCount: count, mysql: sql, parser: parser, } prepareName = fmt.Sprintf("? %s ", prepareName) stmt.prename = []byte(prepareName) xgConn.prepareName = prepareName xgConn.mu.Unlock() return stmt, nil } func (xgConn *xuguConn) Close() error { fmt.Println("Close connection") err := xgConn.conn.Close() if err != nil { fmt.Println("Close connection error") return err } return nil } func (xgConn *xuguConn) Exec(sql string, args []driver.Value) (driver.Result, error) { fmt.Printf(">>> (xgConn *xuguConn) Exec: %s, %v\n", sql, args) if switchSQLType(sql) == SQL_SELECT { return nil, errors.New("The executed SQL statement is not a SELECT") } //有参数传进来 if len(args) != 0 { //创建prepare _, err := xgConn.Prepare(sql) if err != nil { return nil, err } // parser := &xuguParse{ bind_type: 0, param_count: len(args), } for pos, param := range args { parser.assertParamType(param, pos) //fmt.Printf("Field Name: %s, Field Type: %s, Field Value: %v\n", v1, v1, v1) } sockSendPutStatement(xgConn, []byte(xgConn.prepareName), &parser.values, len(args)) XGC_Execute(xgConn) //处理服务器返回 xgConn.conn.Read(xgConn.readBuff.buf) readBuf := &xgConn.readBuff fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:]) fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf)) aR, err := parseMsg(readBuf, xgConn) if err != nil { return nil, err } switch aR.rt { case insertResult: return &xuguResult{ affectedRows: int64(0), insertId: int64(0), }, nil case errInfo: return nil, errors.New(string(aR.e.ErrStr)) case warnInfo: return nil, errors.New(string(aR.w.WarnStr)) default: return &xuguResult{ affectedRows: int64(0), insertId: int64(0), }, nil } } rs, err := xgConn.exec(sql) if err != nil { return nil, err } return rs, nil } func (xgConn *xuguConn) exec(sql string) (driver.Result, error) { sockSendPutStatement(xgConn, []byte(sql), nil, 0) XGC_Execute(xgConn) //处理服务器返回 xgConn.conn.Read(xgConn.readBuff.buf) readBuf := &xgConn.readBuff fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:]) fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf)) aR, err := parseMsg(readBuf, xgConn) if err != nil { return nil, err } switch aR.rt { case insertResult: return &xuguResult{ affectedRows: int64(0), insertId: int64(0), }, nil case errInfo: return nil, errors.New(string(aR.e.ErrStr)) case warnInfo: return nil, errors.New(string(aR.w.WarnStr)) default: return &xuguResult{ affectedRows: int64(0), insertId: int64(0), }, nil } } func (xgConn *xuguConn) Query(sql string, args []driver.Value) (driver.Rows, error) { fmt.Println(">>> (xgConn *xuguConn) Query") if switchSQLType(sql) != SQL_SELECT { return nil, errors.New("The executed SQL statement is not a SELECT") } if len(args) != 0 { parser := &xuguParse{ bind_type: 0, param_count: len(args), } for pos, param := range args { err := parser.assertParamType(param, pos) if err != nil { return nil, err } } sockSendPutStatement(xgConn, []byte(xgConn.prepareName), &parser.values, len(args)) XGC_Execute(xgConn) //处理服务器返回 xgConn.conn.Read(xgConn.readBuff.buf) readBuf := &xgConn.readBuff fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:]) fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf)) aR, err := parseMsg(readBuf, xgConn) if err != nil { return nil, err } switch aR.rt { case selectResult: rows := &xuguRows{ rows_conn: xgConn, results: aR.s, lastRowRelt: int(0), lastRelt: int(0), prepared: false, } return rows, nil case errInfo: return nil, errors.New(string(aR.e.ErrStr)) case warnInfo: return nil, errors.New(string(aR.w.WarnStr)) default: } } //最终发送消息给服务器 sockSendPutStatement(xgConn, []byte(sql), nil, 0) XGC_Execute(xgConn) //----------------处理服务器查询返回 xgConn.conn.Read(xgConn.readBuff.buf) //fmt.Println("Message from server:", (buffer[:n])) fmt.Println("Message from server:", string(xgConn.readBuff.buf)) //fmt.Println("\nMessage from server2:", (stmt.stmt_conn.readBuff.buf)) results, err := parseSelectResult(&xgConn.readBuff) if err != nil { fmt.Println("parseSelectResult parseSelectResult err", err.Error()) return nil, err } rows := &xuguRows{ rows_conn: xgConn, results: results, lastRowRelt: int(0), lastRelt: int(0), prepared: false, } return rows, nil } func (xgConn *xuguConn) Ping(ctx context.Context) error { sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0) XGC_Execute(xgConn) xgConn.conn.Read(xgConn.readBuff.buf) //fmt.Println("Message from server:", (buffer[:n])) fmt.Println("Message from server:", string(xgConn.readBuff.buf)) //fmt.Println("\nMessage from server2:", (stmt.stmt_conn.readBuff.buf)) _, err := parseSelectResult(&xgConn.readBuff) if err != nil { fmt.Println("parseSelectResult parseSelectResult err", err.Error()) return err } xgConn.readBuff.idx = 0 xgConn.readBuff.buf = make([]byte, 2048) return nil } func (pConn *xuguConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { Value, err := namedValueToValue(args) if err != nil { return nil, err } return pConn.Exec(query, Value) } func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { __Par := make([]driver.Value, len(named)) for pos, Param := range named { if len(Param.Name) > 0 { return nil, errors.New("Driver does not support the use of Named Parameters") } __Par[pos] = Param.Value } return __Par, nil }