Browse Source

需要修改

GTong 10 months ago
parent
commit
2d8eb47b4f
15 changed files with 708 additions and 798 deletions
  1. 171 0
      Untitled-1
  2. 77 133
      main.go
  3. 49 0
      test/db_exec_test.go
  4. 101 0
      test/db_query_test.go
  5. 4 0
      xugu/buffer.go
  6. 0 401
      xugu/discard.go
  7. 111 18
      xugu/xugu_conn.go
  8. 16 0
      xugu/xugu_define.go
  9. 0 43
      xugu/xugu_fields.go
  10. 151 37
      xugu/xugu_parse.go
  11. 2 25
      xugu/xugu_prepare.go
  12. 1 15
      xugu/xugu_rows.go
  13. 4 109
      xugu/xugu_sock.go
  14. 21 6
      xugu/xugu_stmt.go
  15. 0 11
      xugu/xugu_utils.go

+ 171 - 0
Untitled-1

@@ -0,0 +1,171 @@
+package main
+
+import (
+	"io"
+	"net"
+	"time"
+)
+
+const defaultBufSize = 4096
+const maxCachedBufSize = 256 * 1024
+
+// 一个用于读写的缓冲区。
+// 由于每个连接上的通信是同步的,因此可以这样做。
+// 换句话说,我们不能在同一个连接上同时进行读写。
+// 该缓冲区类似于 bufio.Reader/Writer,但具有零拷贝特性。
+// 还针对这种特定用例进行了高度优化。
+// 这个缓冲区由两个字节切片组成,使用双缓冲机制。
+type buffer struct {
+	buf     []byte // buf 是一个字节缓冲区,其长度和容量相等。
+	nc      net.Conn
+	idx     int
+	length  int
+	timeout time.Duration
+	dbuf    [2][]byte // dbuf 是一个包含两个字节切片的数组,用于支持这个缓冲区
+	flipcnt uint      // flipcnt 是双缓冲机制中的当前缓冲区计数器
+}
+
+// newBuffer 分配并返回一个新的缓冲区。
+func newBuffer(nc net.Conn) buffer {
+	fg := make([]byte, defaultBufSize)
+	return buffer{
+		buf:  fg,
+		nc:   nc,
+		dbuf: [2][]byte{fg, nil},
+	}
+}
+
+// flip 用后台缓冲区替换活动缓冲区
+// 这是一个延迟的翻转操作,只是简单地增加缓冲区计数器;
+// 实际的翻转将在下次调用 `buffer.fill` 时执行
+func (b *buffer) flip() {
+	b.flipcnt += 1
+}
+
+// fill 将数据读入缓冲区,直到至少包含 _need_ 字节的数据
+func (b *buffer) fill(need int) error {
+	n := b.length
+	// 将数据填充到双缓冲目标:如果我们在这个缓冲区上调用了 flip,
+	// 我们将复制到后台缓冲区,然后用网络数据填充它;否则我们只需在填充前将当前缓冲区的内容移到前面
+	dest := b.dbuf[b.flipcnt&1]
+
+	// 如果需要,增加缓冲区的大小以适应整个数据包。
+	if need > len(dest) {
+		// 向上舍入到下一个默认大小的倍数
+		dest = make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
+
+		// 如果分配的缓冲区不太大,将其移至支持存储,以防止在执行大读操作的应用程序上进行额外的分配
+		if len(dest) <= maxCachedBufSize {
+			b.dbuf[b.flipcnt&1] = dest
+		}
+	}
+
+	// 如果我们填充的是 fg 缓冲区,将现有数据移动到其起始位置。
+	// 如果我们填充的是 bg 缓冲区,复制数据
+	if n > 0 {
+		copy(dest[:n], b.buf[b.idx:])
+	}
+
+	b.buf = dest
+	b.idx = 0
+
+	for {
+		if b.timeout > 0 {
+			if err := b.nc.SetReadDeadline(time.Now().Add(b.timeout)); err != nil {
+				return err
+			}
+		}
+
+		nn, err := b.nc.Read(b.buf[n:])
+		n += nn
+
+		switch err {
+		case nil:
+			if n < need {
+				continue
+			}
+			b.length = n
+			return nil
+
+		case io.EOF:
+			if n >= need {
+				b.length = n
+				return nil
+			}
+			return io.ErrUnexpectedEOF
+
+		default:
+			return err
+		}
+	}
+}
+
+// 返回缓冲区中的下一个 N 个字节。
+// 返回的切片仅保证在下一次读取之前有效
+func (b *buffer) readNext(need int) ([]byte, error) {
+	if b.length < need {
+		// 重新填充
+		if err := b.fill(need); err != nil {
+			return nil, err
+		}
+	}
+
+	offset := b.idx
+	b.idx += need
+	b.length -= need
+	return b.buf[offset:b.idx], nil
+}
+
+// takeBuffer 返回一个具有请求大小的缓冲区。
+// 如果可能,从现有缓冲区返回一个切片。
+// 否则,创建一个更大的缓冲区。
+// 只能同时使用一个缓冲区(总共)。
+func (b *buffer) takeBuffer(length int) ([]byte, error) {
+	if b.length > 0 {
+		return nil, ErrBusyBuffer
+	}
+
+	// 先测试(便宜的)一般情况
+	if length <= cap(b.buf) {
+		return b.buf[:length], nil
+	}
+
+	if length < maxPacketSize {
+		b.buf = make([]byte, length)
+		return b.buf, nil
+	}
+
+	// 缓冲区大于我们要存储的内容。
+	return make([]byte, length), nil
+}
+
+// takeSmallBuffer 是一个快捷方式,如果已知长度小于 defaultBufSize,可以使用。
+// 只能同时使用一个缓冲区(总共)。
+func (b *buffer) takeSmallBuffer(length int) ([]byte, error) {
+	if b.length > 0 {
+		return nil, ErrBusyBuffer
+	}
+	return b.buf[:length], nil
+}
+
+// takeCompleteBuffer 返回完整的现有缓冲区。
+// 如果必要的缓冲区大小未知,可以使用此方法。
+// 返回的缓冲区的 cap 和 len 将相等。
+// 只能同时使用一个缓冲区(总共)。
+func (b *buffer) takeCompleteBuffer() ([]byte, error) {
+	if b.length > 0 {
+		return nil, ErrBusyBuffer
+	}
+	return b.buf, nil
+}
+
+// store 存储缓冲区 buf,如果适合的话。
+// 如果其大小适中,则更新缓冲区。
+func (b *buffer) store(buf []byte) error {
+	if b.length > 0 {
+		return ErrBusyBuffer
+	} else if cap(buf) <= maxPacketSize && cap(buf) > cap(b.buf) {
+		b.buf = buf[:cap(buf)]
+	}
+	return nil
+}

+ 77 - 133
main.go

@@ -4,152 +4,96 @@ import (
 	"database/sql"
 	"fmt"
 	"log"
+	"math/rand"
+	"sync"
+	"time"
 	_ "xugu_driver/xugu"
 )
 
-func main() {
-	db, err := sql.Open("xugusql", "IP=10.28.20.101;DB=SYSTEM;User=SYSDBA;PWD=SYSDBA;Port=5190;AUTO_COMMIT=on;CHAR_SET=UTF8")
-	//_, err = db.Exec("create table go_test(c1 int, c2 varchar);")
-	//db.Ping()
-	if err != nil {
-		log.Fatal(err)
-	}
-
-	// err = db.Ping()
-	// if err != nil {
-	// 	fmt.Printf("connect xugu dbms ... failed\n")
-	// } else {
-
-	// 	fmt.Printf("connect xugu dbms ... ok\n")
-	// }
-
-	// stmt, err := db.Prepare("SELECT ? FROM gotest2;")
-	// stmt.Query("1")
-	//查询
-	// row, err := db.Query("select 1 from gotest2;")
-	// if err != nil {
-	// 	fmt.Println("查询错误", err)
-	// }
-	// col, _ := row.Columns()
-	// fmt.Println("row.Columns(): ", col)
-	// fmt.Println("len(col)", len(col))
-	// pvals := make([]interface{}, len(col))
-	// for key, _ := range pvals {
-	// 	dest := make([]byte, 216)
-	// 	pvals[key] = &dest
-	// } /* end for */
-	// for row.Next() {
-	// 	row.Scan(pvals...)
-	// 	for _, v := range pvals {
-	// 		//fmt.Printf("pvals:%v\t \n", v) binary.LittleEndian.Uint32(col.Col_Data)
-	// 		//fmt.Printf("第%d次:  %s\t", k, string(*(v.(*[]byte))))
-	// 		//	fmt.Printf("pvals:%v\t \n", (v.(*[]byte)))
-	// 		fmt.Printf("结果 %s\t", string(*(v.(*[]byte))))
-	// 	}
-	// 	fmt.Printf("\n")
-	// }
-
-	//消息类型为E
-	// r, err := db.Exec("drop table ta3;drop table ta4;")
-	// r.RowsAffected()
-	// db.Exec("INSERT INTO  gotest VALUES('gt3');")
-
-	//db.Prepare
-	// stmt, err := db.Prepare("INSERT INTO gotest values(?);")
-	// if err != nil {
-	// 	log.Fatal(err)
-	// }
-	// ret, err := stmt.Exec("gt32")
-	// if err != nil {
-	// 	log.Fatal(err)
-	// }
-	// fmt.Println("ret", ret)
+// 创建测试表
+const createTableQuery = `
+CREATE TABLE IF NOT EXISTS test_table (
+    id INT AUTO_INCREMENT PRIMARY KEY,
+    value VARCHAR(255) NOT NULL
+);`
 
-	//db.Exec
-	//db.Exec("INSERT INTO gotest2 VALUES(1, 'gt3');")
+// 插入数据的 SQL 语句
+const insertQuery = `INSERT INTO test_table (value) VALUES (?)`
 
-	//db.Query
-	// rows, err := db.Query("select * from gotest")
-	// if err != nil {
-	// 	log.Fatal(err)
-	// }
-	// var cols []string
-	// cols, err = rows.Columns()
-	// if err != nil {
-	// 	log.Fatal(err)
-	// }
-	// pvals := make([]interface{}, len(cols))
-	// for key, _ := range pvals {
-	// 	dest := make([]byte, 216)
-	// 	pvals[key] = &dest
-	// } /* end for */
-	// for rows.Next() {
-	// 	err = rows.Scan(pvals...)
-	// 	if err != nil {
-	// 		log.Fatal(err)
-	// 	}
-	// 	for _, v := range pvals {
-	// 		fmt.Printf("%s\t", string(*(v.(*[]byte))))
-	// 	}
-	// 	fmt.Printf("\n")
-	// }
-	// rows.Close()
+// 读取数据的 SQL 语句
+const selectQuery = `SELECT value FROM test_table WHERE id = ?`
 
-	//插入blob
-	//db.Exec("create table goblob(a int, b blob);")
-	// 打开文件
-	// file, err := os.Open("todo.txt")
-	// if err != nil {
-	// 	log.Fatal(err)
-	// }
-	// defer file.Close()
-	// 读取文件内容
-	// content, err := io.ReadAll(file)
-	// if err != nil {
-	// 	log.Fatal(err)
-	// }
-	//db.Exec("insert into goblob values(1, ?);", content)
-	//stmt方式插入
-	// stmt, err := db.Prepare("insert into goblob values(1, ?);")
-	// if err != nil {
-	// 	fmt.Println("error", err)
-	// }
-	// _, err = stmt.Exec(content)
-	// if err != nil {
-	// 	fmt.Println("error", err)
-	// }
+// 测试的并发度
+const concurrency = 10
 
-	//查询blob
-	rows, err := db.Query("select b from goblob;")
-	if err != nil {
-		fmt.Println("error", err)
-	}
-	var cols []string
-	cols, err = rows.Columns()
+// 初始化随机种子
+func init() {
+	rand.Seed(time.Now().UnixNano())
+}
+func main() {
+	db, err := sql.Open("xugusql", "IP=10.28.20.101;DB=SYSTEM;User=SYSDBA;PWD=SYSDBA;Port=5190;AUTO_COMMIT=on;CHAR_SET=UTF8")
 	if err != nil {
 		log.Fatal(err)
 	}
-	pvals := make([]interface{}, len(cols))
-	for key, _ := range pvals {
-		dest := make([]byte, 216)
 
-		pvals[key] = &dest
-	} /* end for */
-
-	for rows.Next() {
-		fmt.Println("rows.Next()")
-		err = rows.Scan(pvals...)
-		if err != nil {
-			log.Fatal(err)
-		}
-		for _, v := range pvals {
-			fmt.Printf("aa %s\t", string(*(v.(*[]byte))))
-		}
+	// 创建测试表
+	_, err = db.Exec(createTableQuery)
+	if err != nil {
+		log.Fatalf("Failed to create test table: %v", err)
+	}
 
-		fmt.Printf("\n")
+	// 使用 WaitGroup 管理并发操作
+	var wg sync.WaitGroup
+
+	// 记录成功和失败的操作次数
+	var successCount, failureCount int64
+	var mu sync.Mutex
+
+	// 运行并发操作
+	for i := 0; i < concurrency; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for {
+				// 随机选择插入或查询操作
+				if rand.Intn(2) == 0 {
+					// 执行写操作  // `INSERT INTO test_table (value) VALUES (?)`
+					_, err := db.Exec(insertQuery, fmt.Sprintf("value-%d", rand.Int()))
+					mu.Lock()
+					if err != nil {
+						failureCount++
+					} else {
+						successCount++
+					}
+					mu.Unlock()
+				} else {
+					// 执行读操作
+					id := rand.Intn(1000) + 1
+					var value string //`SELECT value FROM test_table WHERE id = ?`
+					err := db.QueryRow(selectQuery, id).Scan(&value)
+					mu.Lock()
+					if err != nil {
+						failureCount++
+					} else {
+						successCount++
+					}
+					mu.Unlock()
+				}
+
+				// 打印当前成功和失败的操作次数
+				mu.Lock()
+				log.Printf("Success: %d, Failure: %d", successCount, failureCount)
+				mu.Unlock()
+
+				// 短暂休眠以防止过度频繁的请求
+				time.Sleep(100 * time.Millisecond)
+			}
+		}()
 	}
 
-	rows.Close()
-	db.Close()
+	// 等待所有操作完成(实际上这个程序不会停止,因为循环是无限的)
+	wg.Wait()
+
+	defer db.Close()
 
 }

+ 49 - 0
test/db_exec_test.go

@@ -0,0 +1,49 @@
+package xugutest
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"log"
+	"os"
+	"testing"
+	_ "xugu_driver/xugu"
+)
+
+func TestStmtExec(t *testing.T) {
+
+	//db.Exec("create table goblob(a int, b blob);")
+	// 打开文件
+	file, err := os.Open("C:/Program_GT/Code/Go/Work/xugu/xugu_driver/todo.txt")
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer file.Close()
+	// 读取文件内容
+	content, err := io.ReadAll(file)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	//stmt方式插入
+	stmt, err := db.Prepare("insert into goblob values(1, ?);")
+	if err != nil {
+		fmt.Println("error", err)
+	}
+	_, err = stmt.Exec(content)
+	if err != nil {
+		fmt.Println("error", err)
+	}
+
+}
+
+func TestStmtExecContext(t *testing.T) {
+	_, err := db.ExecContext(context.Background(),
+		"create table go_test(c1 int, c2 varchar);")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	db.Close()
+
+}

+ 101 - 0
test/db_query_test.go

@@ -0,0 +1,101 @@
+package xugutest
+
+import (
+	"database/sql"
+	"fmt"
+	"log"
+	"testing"
+	_ "xugu_driver/xugu"
+)
+
+func openDb() (*sql.DB, error) {
+
+	return sql.Open("xugusql",
+		"IP=10.28.20.101;DB=SYSTEM;User=SYSDBA;PWD=SYSDBA;Port=5190;AUTO_COMMIT=on;CHAR_SET=UTF8")
+}
+
+var db *sql.DB
+
+func init() {
+	var err error
+	db, err = openDb()
+	if err != nil {
+		log.Fatal(err)
+	}
+
+}
+
+func TestPing(t *testing.T) {
+	err := db.Ping()
+	if err != nil {
+		fmt.Printf("connect xugu dbms ... failed\n")
+	} else {
+
+		fmt.Printf("connect xugu dbms ... ok\n")
+	}
+
+}
+
+func TestQuery(t *testing.T) {
+	rows, err := db.Query("select * from gotest")
+	if err != nil {
+		log.Fatal(err)
+	}
+	var cols []string
+	cols, err = rows.Columns()
+	if err != nil {
+		log.Fatal(err)
+	}
+	pvals := make([]interface{}, len(cols))
+	for key, _ := range pvals {
+		dest := make([]byte, 216)
+		pvals[key] = &dest
+	} /* end for */
+	for rows.Next() {
+		err = rows.Scan(pvals...)
+		if err != nil {
+			log.Fatal(err)
+		}
+		for _, v := range pvals {
+			fmt.Printf("获取到的数据 为%s\t", string(*(v.(*[]byte))))
+		}
+		fmt.Printf("\n")
+	}
+	rows.Close()
+}
+
+func TestStmtQuery(t *testing.T) {
+
+	stmt, _ := db.Prepare("select * from gotest where name = ?;")
+	rows, err := stmt.Query("'gc'")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	var cols []string
+	cols, err = rows.Columns()
+	if err != nil {
+		log.Fatal(err)
+	}
+	pvals := make([]interface{}, len(cols))
+	for key, _ := range pvals {
+
+		dest := make([]byte, 216)
+		pvals[key] = &dest
+	} /* end for */
+
+	for rows.Next() {
+		err = rows.Scan(pvals...)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		for _, v := range pvals {
+			fmt.Printf("%s\t", string(*(v.(*[]byte))))
+		}
+		fmt.Printf("\n")
+	}
+
+	rows.Close()
+
+}

+ 4 - 0
xugu/buffer.go

@@ -43,6 +43,10 @@ func (b *buffer) peekChar() byte {
 	//	fmt.Println("peekChar内部的: ", b.buf[b.idx:])
 	return ret
 }
+func (b *buffer) reset() {
+	b.idx = 0
+	b.buf = make([]byte, 2048)
+}
 
 // 返回指定字节的切片
 func (b *buffer) readNext(need int, reverse bool) []byte {

+ 0 - 401
xugu/discard.go

@@ -1,401 +0,0 @@
-package xugu
-
-import (
-	"bufio"
-	"encoding/binary"
-	"errors"
-	"fmt"
-	"net"
-)
-
-// ----------------------------------------------------------------------------------------------------
-type SrvResponse struct {
-	FormArgDescri []*FormArgDescri
-	SelectResult  []*SelectResult
-	InsertResult  []*InsertResult
-	UpdateResult  []*UpdateResult
-	DeleteResult  []*DeleteResult
-	ProcRet       []*ProcRet
-	OutParamRet   []*OutParamRet
-	ErrInfo       []*ErrInfo
-	WarnInfo      []*WarnInfo
-	Message       []*Message
-}
-
-// func parseResponse(conn net.Conn) (*SrvResponse, error) {
-// 	response := &SrvResponse{}
-// 	reader := bufio.NewReader(conn)
-
-// 	for {
-// 		msgType, err := reader.ReadByte()
-// 		if err != nil {
-// 			return nil, err
-// 		}
-
-// 		switch msgType {
-// 		case '$':
-// 			argDescri, err := parseFormArgDescri(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.FormArgDescri = append(response.FormArgDescri, argDescri)
-
-// 		case 'I':
-// 			insertResult, err := parseInsertResult(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.InsertResult = append(response.InsertResult, insertResult)
-// 		case 'U':
-// 			updateResult, err := parseUpdateResult(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.UpdateResult = append(response.UpdateResult, updateResult)
-// 		case 'D':
-// 			deleteResult, err := parseDeleteResult(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.DeleteResult = append(response.DeleteResult, deleteResult)
-// 		case 'O':
-// 			procRet, err := parseProcRet(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.ProcRet = append(response.ProcRet, procRet)
-// 		case 'P':
-// 			outParamRet, err := parseOutParamRet(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.OutParamRet = append(response.OutParamRet, outParamRet)
-// 		case 'E':
-// 			errInfo, err := parseErrInfo(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.ErrInfo = append(response.ErrInfo, errInfo)
-// 		case 'W':
-// 			warnInfo, err := parseWarnInfo(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.WarnInfo = append(response.WarnInfo, warnInfo)
-// 		case 'M':
-// 			message, err := parseMessage(reader)
-// 			if err != nil {
-// 				return nil, err
-// 			}
-// 			response.Message = append(response.Message, message)
-// 		case 'K':
-// 			return response, nil
-// 		default:
-// 			return nil, fmt.Errorf("未知的消息类型: %v", msgType)
-// 		}
-// 	}
-// }
-
-// 解析函数的实现,例如 parseFormArgDescri 等
-
-func readString(reader *bufio.Reader, length int) (string, error) {
-	bytes := make([]byte, length)
-	_, err := reader.Read(bytes)
-	return string(bytes), err
-}
-
-func parseFieldDescri(reader *bufio.Reader) (FieldDescri, error) {
-	fieldDescri := FieldDescri{}
-	var err error
-
-	fieldDescri.FieldNameLen, err = readInt32(reader)
-	if err != nil {
-		return fieldDescri, err
-	}
-	fieldDescri.FieldName, err = readString(reader, fieldDescri.FieldNameLen)
-	if err != nil {
-		return fieldDescri, err
-	}
-	// fieldDescri.FieldType, err = fieldType(reader)
-	// if err != nil {
-	// 	return fieldDescri, err
-	// }
-	fieldDescri.FieldPreciScale, err = readInt32(reader)
-	if err != nil {
-		return fieldDescri, err
-	}
-	fieldDescri.FieldFlag, err = readInt32(reader)
-	if err != nil {
-		return fieldDescri, err
-	}
-
-	return fieldDescri, nil
-}
-
-func xgSockRecvCommand(pConn *xuguConn) {
-
-}
-
-func xgSockRecvData(conn net.Conn, buffer *[]byte) (int, error) {
-	// 接收服务器的返回消息
-	n, err := conn.Read(*buffer)
-	if err != nil {
-		fmt.Println("接收服务器消息错误Error reading from server:", err)
-		return 0, err
-	}
-	return n, nil
-}
-
-// TODO : pConn.bkChar != 0x0  这一部分代码需要修改
-func rhRecv(pConn *xuguConn, buff []byte, dLen uint32) error {
-	//old_len := dLen
-	if pConn.bkChar != 0x0 {
-		buff[0] = pConn.bkChar
-		pConn.bkChar = 0x0
-		//buff = buff[1:]
-		dLen--
-	}
-	//接收加密登录信息
-	if dLen != 0 {
-		for dLen != 0 {
-			if pConn.useSSL {
-				buffer := make([]byte, 1024)
-				n, err := pConn.conn.Read(buffer)
-				if err != nil {
-					return err
-				} else {
-					//xgCacheRecv(pConn, buffer, int32(n))
-				}
-				if n <= 0 {
-					return errors.New("read error")
-				}
-
-			}
-		}
-		//	DECRYPT(p_conn, buff, old_len); //解密缓冲
-	}
-	return nil
-}
-
-// 接收单个字符
-func rhRecvChar(pConn *xuguConn, valp *byte) (bool, error) {
-
-	var ch [1]byte
-	err := rhRecv(pConn, ch[:], 1)
-	if err != nil {
-		return false, err
-	}
-	fmt.Println("ch[] = ", ch[:])
-	*valp = ch[0]
-	return true, nil
-
-}
-
-//读取检查一个字符
-
-func rhRecvStr(pConn *xuguConn, valp *[]byte) (int, error) {
-	var len uint32
-	if err := rhRecvInt32(pConn, &len); err != nil {
-		return -4, err
-	}
-	pBuff := make([]byte, len+1)
-	if err := rhRecv(pConn, pBuff, len); err != nil {
-		return -4, err
-	}
-	pBuff[len] = 0x0
-	*valp = pBuff
-	return 0, nil
-}
-func rhRecvInt32(pconn *xuguConn, i *uint32) error {
-	var buff [4]byte
-	err := rhRecv(pconn, buff[:], 4)
-	if err != nil {
-		return err
-	}
-
-	// 将大端字节序转换为主机字节序
-	*i = binary.BigEndian.Uint32(buff[:])
-
-	return nil
-}
-
-// // 从连接中接收字段信息,并将其存储在 Result 结构体中。
-// func recvFiledsInfo(pConn *xuguConn, pRes *Result) (int, error) {
-// 	var fieldNum uint32
-
-// 	//var pRet *FieldInfo
-// 	if err := rhRecvInt32(pConn, &fieldNum); err != nil {
-// 		return XG_NET_ERROR, err
-// 	}
-// 	if fieldNum > 4000 {
-// 		return XG_NET_ERROR, errors.New("fieldNum >4000")
-// 	}
-
-// 	pRes.FieldNum = fieldNum
-
-// 	pRet := make([]FieldInfo, fieldNum)
-// 	//接收字段详细信息
-// 	for n := uint32(0); n < fieldNum; n++ {
-// 		if ret, err := recvAttrDesItem(pConn, &pRet[n]); err != nil {
-// 			return ret, err
-// 		}
-// 	}
-
-// 	bytesN := (fieldNum*2 + 7) / 8
-// 	for n := uint32(0); n < fieldNum; n++ {
-// 		pRet[n].Offset = bytesN
-// 		bytesN += uint32(getSQLCType(pRet[n].TypeID))
-// 	}
-// 	// TODO : 这里需要重新修改
-// 	// 在c代码中这里还加上了 ROW_HEAD_SIZE
-// 	pRes.RowSize = bytesN + 24
-// 	pRes.ColInfos = pRet
-
-// 	return 0, nil
-// }
-
-// // 接收字段属性描述
-// func recvAttrDesItem(pConn *xuguConn, pItem *FieldInfo) (int, error) {
-// 	var nameLen uint32
-// 	var tabName []byte
-// 	var alias []byte
-
-// 	if err := rhRecvInt32(pConn, &nameLen); err != nil {
-// 		return XG_NET_ERROR, err
-// 	}
-// 	if nameLen > 1024 {
-// 		return XG_NET_ERROR, errors.New("nameLen >1024")
-// 	}
-// 	var name []byte
-// 	if err := rhRecv(pConn, name, nameLen); err != nil {
-// 		return XG_NET_ERROR, err
-// 	}
-
-// 	if ret := bytes.IndexByte(name, '%'); ret != -1 {
-// 		alias = name[ret+1:]
-// 		name = name[:ret-1]
-// 	}
-
-// 	if ret := bytes.IndexByte(name, '.'); ret != -1 {
-// 		//如果 return_schema_on 为真,那么这里将得到 schema_name.tab_name。
-// 		tabName = name[:ret-1]
-// 		name = name[ret+1:]
-// 	}
-
-// 	pItem.TabName = string(tabName)
-// 	pItem.Name = string(name)
-// 	pItem.Alias = string(alias)
-
-// 	if err := rhRecvInt32(pConn, &pItem.TypeID); err != nil {
-// 		return XG_NET_ERROR, err
-// 	}
-// 	if err := rhRecvInt32(pConn, &pItem.Modi); err != nil {
-// 		return XG_NET_ERROR, err
-// 	}
-// 	if err := rhRecvInt32(pConn, &pItem.Flags); err != nil {
-// 		return XG_NET_ERROR, err
-// 	}
-
-// 	pItem.CTypeID = uint32(getSQLCType(pItem.TypeID))
-
-// 	return 0, nil
-// }
-
-func readInt32(reader *bufio.Reader) (int, error) {
-	var value int32
-	err := binary.Read(reader, binary.BigEndian, &value)
-	return int(value), err
-}
-
-//接收结果集
-//int  recv_Query_Rs(Conn_Attrs* p_conn,Result** pp_res,int*field_num,int64* rowcount,int* effected_num)
-/* recv_Query_Rs :
-* p_conn conn inout
-* pp_res  output
- */
-// func recvQueryRs(pConn *xuguConn, ppRes **Result) int {
-// 	ret := 0
-// 	pres := &Result{}
-// 	pres_H := &Result{}
-// 	for {
-// 		var ch byte
-// 		rhRecvChar(pConn, &ch)
-// 		switch ch {
-// 		case 'k':
-// 			if pres_H != nil {
-// 				*ppRes = pres_H
-// 			} else {
-// 				*ppRes = pres
-// 			}
-// 			return ret
-
-// 		case 'E', 'F':
-// 			var errStr []byte
-// 			rhRecvStr(pConn, &errStr)
-// 			fmt.Printf("[EC031]Error in recv result :%s \n", errStr)
-// 			pConn.errStr = errStr
-// 			ret = XG_ERROR
-// 			continue
-
-// 		case 'W', 'M':
-// 			var errStr []byte
-// 			rhRecvStr(pConn, &errStr)
-// 			fmt.Printf("[EC032]Server Warning in recv result :%s \n", errStr)
-// 			pConn.errStr = errStr
-// 			ret = XG_ERROR // cyj 是否有点错误
-// 		case 'I':
-// 			//var len uint32
-// 			// if !rhRecvInt32(pConn, &len) {
-
-// 			// }
-
-// 		case 'U', 'D':
-// 		case 'A':
-// 			if pres == nil {
-// 				pres := &Result{}
-// 				pres.Type = HT_RS
-// 			} else {
-// 				var prev = &Result{}
-// 				if pres_H != nil {
-// 					prev = pres_H
-// 					for prev.NextResult != nil {
-// 						prev = prev.NextResult
-// 					}
-// 				} else {
-// 					pres_H = pres
-// 					prev = pres_H
-// 				}
-// 				pres = &Result{}
-// 				pres.Type = HT_RS
-// 				prev.NextResult = pres
-// 			}
-// 			pres.SQLType = SQL_SELECT
-// 			pres.DbcFlob = pConn
-
-// 			// _, err := recvFiledsInfo(pConn, pres)
-// 			// if err != nil {
-// 			// 	return XG_SOCKET_ERROR,
-// 			// }
-
-// 		}
-
-// 	}
-// 	return 0
-// }
-
-// recv_record
-// func recvRecords20000(p_conn *Conn_Attrs, p_res *Result) int {
-// 	pRow := RhRow{}
-
-// 	// if p_res.PBlokmemls == nil{
-// 	// 	p_res.PBlokmemls =
-// 	// }
-
-// 	for i := 0; i < 20000; i++ {
-// 		//recvRecord(p_conn, p_res, &pRow)
-// 	}
-// }
-
-func recvRecord() {
-
-}

+ 111 - 18
xugu/xugu_conn.go

@@ -59,16 +59,22 @@ func (xgConn *xuguConn) Get_error() error {
 
 func (xgConn *xuguConn) Begin() (driver.Tx, error) {
 
-	return nil, nil
+	_, err := xgConn.exec("set auto_commit off;")
+	if err != nil {
+		return nil, err
+	}
+
+	return &xuguTx{tconn: xgConn}, nil
 
 }
 
 var prepareCount = 0
 
 func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
+	xgConn.mu.Lock()
 	fmt.Println(">>>>>(xgConn *xuguConn) Prepare(query string)")
 	prepareName := fmt.Sprintf("GTONG%d", prepareCount)
-
+	prepareCount++
 	//判断sql类型
 	switch switchSQLType(sql) {
 	case SQL_PROCEDURE:
@@ -83,7 +89,6 @@ func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
 	parser := &xuguParse{
 		bind_type:   0,
 		param_count: 0,
-		position:    0,
 	}
 	count := parser.assertParamCount(sql)
 	parser.param_count = count
@@ -110,6 +115,7 @@ func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
 	prepareName = fmt.Sprintf("? %s ", prepareName)
 	stmt.prename = []byte(prepareName)
 	xgConn.prepareName = prepareName
+	xgConn.mu.Unlock()
 	return stmt, nil
 }
 
@@ -125,6 +131,7 @@ func (xgConn *xuguConn) Close() error {
 
 func (xgConn *xuguConn) Exec(sql string,
 	args []driver.Value) (driver.Result, error) {
+
 	fmt.Printf(">>> (xgConn *xuguConn) Exec: %s, %v\n", sql, args)
 
 	if switchSQLType(sql) == SQL_SELECT {
@@ -142,7 +149,6 @@ func (xgConn *xuguConn) Exec(sql string,
 		parser := &xuguParse{
 			bind_type:   0,
 			param_count: len(args),
-			position:    0,
 		}
 
 		for pos, param := range args {
@@ -158,11 +164,31 @@ func (xgConn *xuguConn) Exec(sql string,
 		fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:])
 		fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf))
 
-		rs, err := parseMsg(readBuf, xgConn)
+		aR, err := parseMsg(readBuf, xgConn)
 		if err != nil {
 			return nil, err
 		}
-		return rs, nil
+		switch aR.rt {
+		case insertResult:
+
+			return &xuguResult{
+				affectedRows: int64(0),
+				insertId:     int64(0),
+			}, nil
+
+		case errInfo:
+
+			return nil, errors.New(string(aR.e.ErrStr))
+		case warnInfo:
+
+			return nil, errors.New(string(aR.w.WarnStr))
+		default:
+			return &xuguResult{
+				affectedRows: int64(0),
+				insertId:     int64(0),
+			}, nil
+		}
+
 	}
 
 	rs, err := xgConn.exec(sql)
@@ -183,11 +209,29 @@ func (xgConn *xuguConn) exec(sql string) (driver.Result, error) {
 	fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:])
 	fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf))
 
-	rs, err := parseMsg(readBuf, xgConn)
+	aR, err := parseMsg(readBuf, xgConn)
 	if err != nil {
 		return nil, err
 	}
-	return rs, nil
+
+	switch aR.rt {
+	case insertResult:
+		return &xuguResult{
+			affectedRows: int64(0),
+			insertId:     int64(0),
+		}, nil
+
+	case errInfo:
+		return nil, errors.New(string(aR.e.ErrStr))
+	case warnInfo:
+		return nil, errors.New(string(aR.w.WarnStr))
+	default:
+		return &xuguResult{
+			affectedRows: int64(0),
+			insertId:     int64(0),
+		}, nil
+	}
+
 }
 
 func (xgConn *xuguConn) Query(sql string,
@@ -198,24 +242,49 @@ func (xgConn *xuguConn) Query(sql string,
 		return nil, errors.New("The executed SQL statement is not a SELECT")
 	}
 
-	parser := &xuguParse{
-		bind_type:   0,
-		param_count: 0,
-		position:    0,
-	}
-
 	if len(args) != 0 {
-
+		parser := &xuguParse{
+			bind_type:   0,
+			param_count: len(args),
+		}
 		for pos, param := range args {
-
 			err := parser.assertParamType(param, pos)
 			if err != nil {
 				return nil, err
 			}
 		}
+		sockSendPutStatement(xgConn, []byte(xgConn.prepareName), &parser.values, len(args))
+		XGC_Execute(xgConn)
+
+		//处理服务器返回
+		xgConn.conn.Read(xgConn.readBuff.buf)
+		readBuf := &xgConn.readBuff
+		fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:])
+		fmt.Println("Message from server EXEC:", string(xgConn.readBuff.buf))
+
+		aR, err := parseMsg(readBuf, xgConn)
+		if err != nil {
+			return nil, err
+		}
+		switch aR.rt {
+		case selectResult:
+			rows := &xuguRows{
+				rows_conn:   xgConn,
+				results:     aR.s,
+				lastRowRelt: int(0),
+				lastRelt:    int(0),
+				prepared:    false,
+			}
+
+			return rows, nil
+		case errInfo:
+
+			return nil, errors.New(string(aR.e.ErrStr))
+		case warnInfo:
+
+			return nil, errors.New(string(aR.w.WarnStr))
+		default:
 
-		if len(parser.values) != parser.assertParamCount(sql) {
-			return nil, errors.New("The number of parameters does not match")
 		}
 
 	}
@@ -264,3 +333,27 @@ func (xgConn *xuguConn) Ping(ctx context.Context) error {
 	xgConn.readBuff.buf = make([]byte, 2048)
 	return nil
 }
+
+func (pConn *xuguConn) ExecContext(ctx context.Context,
+	query string, args []driver.NamedValue) (driver.Result, error) {
+
+	Value, err := namedValueToValue(args)
+	if err != nil {
+		return nil, err
+	}
+
+	return pConn.Exec(query, Value)
+}
+
+func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
+	__Par := make([]driver.Value, len(named))
+
+	for pos, Param := range named {
+		if len(Param.Name) > 0 {
+			return nil, errors.New("Driver does not support the use of Named Parameters")
+		}
+		__Par[pos] = Param.Value
+	}
+
+	return __Par, nil
+}

+ 16 - 0
xugu/xugu_define.go

@@ -1,5 +1,21 @@
 package xugu
 
+//协议消息类型
+type msgType byte
+
+const (
+	selectResult msgType = iota + 0x00
+	insertResult
+	updateResult
+	peleteResult
+	procRet
+	outParamRet
+	errInfo
+	warnInfo
+	message
+	formArgDescri
+)
+
 // 定义 HANDLE_TYPE 枚举
 type HANDLE_TYPE int
 

+ 0 - 43
xugu/xugu_fields.go

@@ -8,51 +8,8 @@ import (
 	"time"
 )
 
-type xuguField struct {
-	tableName string
-
-	/*
-	 * Store the name of the column
-	 * name of the current field
-	 * */
-	name   string
-	length int
-
-	/*
-	 * Store the data type information
-	 * of the current field column
-	 * */
-	fieldType fieldType
-}
-
 type fieldType uint16
 
-// const (
-// 	fieldTypeBool fieldType = iota + 0x01
-// 	fieldTypeChar
-// 	fieldTypeTinyint
-// 	fieldTypeShort
-// 	fieldTypeInteger
-// 	fieldTypeBigint
-// 	fieldTypeFloat
-// 	fieldTypeDouble
-// 	fieldTypeNumeric
-// 	fieldTypeDate
-// 	fieldTypeTime
-// 	fieldTypeTimeTZ
-// 	fieldTypeDatetime   fieldType = 23
-// 	fieldTypeDatetimeTZ fieldType = 14
-// 	fieldTypeBinary     fieldType = 15
-// 	fieldTypeVarChar    fieldType = 30
-
-// 	fieldTypeInterval    fieldType = 21
-// 	fieldTypeIntervalY2M fieldType = 28
-// 	fieldTypeIntervalD2S fieldType = 31
-// 	fieldTypeLob         fieldType = 40
-// 	fieldTypeClob        fieldType = 41
-// 	fieldTypeBlob        fieldType = 42
-// )
-
 const (
 	fieldType_EMPTY fieldType = iota + 0x00
 	fieldType_NULL

+ 151 - 37
xugu/xugu_parse.go

@@ -10,26 +10,10 @@ import (
 	"time"
 )
 
-type ParseParam interface {
-	// 转换参数数据类型
-	assertParamType(driver.Value, int) error
-
-	// 解析参数数量
-	assertParamCount(string) int
-
-	// 解析参数绑定类型(按参数名称绑定类型和按参数位置绑定类型)
-	assertBindType(string) int
-
-	// 解析参数名称
-	assertParamName(string) error
-}
-
 // 辅助结构体
 type xuguValue struct {
 	// 布尔值,如果值为 true,表示当前字段的数据类型是大对象数据类型
 	islob bool
-	plob  []byte
-
 	//字段名
 	paramName []byte
 	//paramNameLength [2]byte
@@ -37,10 +21,6 @@ type xuguValue struct {
 	value []byte
 	// 值的长度
 	valueLength int
-
-	// buff 通常指定要绑定的参数数据的内存缓冲区大小
-	//buff int
-
 	// 字段的类型
 	types fieldType
 }
@@ -58,7 +38,7 @@ type xuguParse struct {
 
 	values []xuguValue
 	// 当参数绑定类型为按参数占位符绑定时,position 标识参数的位置
-	position int
+	//position int
 }
 
 // 判断参数个数
@@ -198,24 +178,12 @@ func (param *xuguParse) assertParamType(dV driver.Value, pos int) error {
 
 	case []byte:
 
-		// re := cgo_xgc_new_lob(&dest.plob)
-
-		// if re < 0 {
-		// 	return errors.New("Cannot create new large object")
-		// }
-
 		srcv, ok := dV.([]byte)
 		if !ok {
 
 			return errors.New("assertParamType  []byte error")
 		}
 
-		// cgo_xgc_put_lob_data(
-		// 	&dest.plob,
-		// 	unsafe.Pointer((*C.char)(unsafe.Pointer(&srcv[0]))),
-		// 	len(srcv))
-		// cgo_xgc_put_lob_data(&dest.plob, nil, -1)
-
 		dest.value = srcv
 		dest.valueLength = len(srcv)
 
@@ -233,7 +201,6 @@ func (param *xuguParse) assertParamType(dV driver.Value, pos int) error {
 		return errors.New("Unknown data type")
 	}
 
-	param.position = pos
 	param.values = append(param.values, dest)
 
 	return nil
@@ -372,6 +339,20 @@ type ArgDescri struct {
 	ArgPreciScale uint32
 }
 
+type allResult struct {
+	rt msgType
+	s  *SelectResult
+	i  *InsertResult
+	u  *UpdateResult
+	d  *DeleteResult
+	p  *ProcRet
+	o  *OutParamRet
+	e  *ErrInfo
+	w  *WarnInfo
+	m  *Message
+	f  *FormArgDescri
+}
+
 func parseSelectResult(readBuf *buffer) (*SelectResult, error) {
 	fmt.Println("调用 parseSelectResult")
 	data := &SelectResult{}
@@ -419,6 +400,7 @@ func parseSelectResult(readBuf *buffer) (*SelectResult, error) {
 		defer func() {
 			fmt.Println("\n\n=========获取行数据结束=================================")
 		}()
+
 		char := readBuf.peekChar()
 		fmt.Println("  --char: ", string(char))
 		readBuf.idx++
@@ -463,7 +445,9 @@ func parseSelectResult(readBuf *buffer) (*SelectResult, error) {
 					}
 					typeIdx++
 					//fieldTypeTinyint,
-				case fieldType_I1, fieldType_I4, fieldType_I8:
+				case fieldType_I1, fieldType_I2, fieldType_I4, fieldType_I8, fieldType_R4, fieldType_R8,
+					fieldType_DATE, fieldType_TIME, fieldType_DATETIME,
+					fieldType_TIME_TZ:
 
 					col.Col_Data = reverseBytes(readBuf.readNext(int(col.Col_len), false))
 					data.Values[colIdx] = append(data.Values[colIdx], col)
@@ -485,18 +469,39 @@ func parseSelectResult(readBuf *buffer) (*SelectResult, error) {
 						//break
 					}
 					typeIdx++
-
+				case fieldType_CLOB, fieldType_BLOB, fieldType_BINARY:
+					col.Col_Data = readBuf.readNext(int(col.Col_len), false)
+					data.Values[colIdx] = append(data.Values[colIdx], col)
+					colIdx++
+					char := readBuf.peekChar()
+					fmt.Println("从查询返回 解析出的值 :", col.Col_Data, string(col.Col_Data))
+					if char == 'R' {
+						readBuf.idx++
+						colIdx = 0
+						if typeIdx >= int(data.Field_Num)-1 {
+							fmt.Println("typeIdx <= int(data.Field_Num)-1: ", typeIdx, int(data.Field_Num)-1)
+							typeIdx = 0
+						} else {
+							typeIdx++
+						}
+						continue
+					} else if char == 'K' {
+						return data, nil
+						//break
+					}
 				} //swich end
 				fmt.Println("循环结束")
+				fmt.Printf("解析请求 data:%#v\n", data.Fields)
 				return data, nil
 			} //for end
 		} else {
 			break
 		}
 	default:
+		fmt.Println("解析请求 错误", data.Fields)
 		return nil, errors.New("parseQueryResult error")
 	} //swich end
-
+	fmt.Println("解析请求 data", data.Fields)
 	return data, nil
 }
 
@@ -604,3 +609,112 @@ func parseFormArgDescri(readBuf *buffer) (*FormArgDescri, error) {
 	fmt.Printf("formArgDescri %#v \n", formArgDescri)
 	return formArgDescri, nil
 }
+
+func parseMsg(readBuf *buffer, pConn *xuguConn) (*allResult, error) {
+	var err error
+	aR := allResult{}
+	for {
+		char := readBuf.peekChar()
+		fmt.Println("parseMsg 内的 peekChar: ", char, "-", string(char))
+		switch char {
+
+		case 'K':
+			fmt.Println("消息类型为K")
+			// readBuf.buf = make([]byte, 2048)
+			//readBuf.idx++
+			// readBuf.buf = make([]byte, 2048)
+			// readBuf.idx = 0
+			readBuf.reset()
+			return &aR, nil
+
+		case '$':
+			readBuf.idx++
+			fmt.Println("消息类型为$")
+			if aR.f, err = parseFormArgDescri(readBuf); err != nil {
+				return nil, err
+			}
+			aR.rt = '$'
+			return &aR, err
+		case 'A':
+			//readBuf.idx++
+			fmt.Println("消息类型为A")
+			if aR.s, err = parseSelectResult(readBuf); err != nil {
+				return nil, err
+			}
+			aR.rt = 'A'
+			return &aR, err
+
+		case 'I':
+			readBuf.idx++
+			fmt.Println("消息类型为I")
+			if aR.i, err = parseInsertResult(readBuf); err != nil {
+				return nil, err
+			}
+			aR.rt = 'I'
+			return &aR, err
+
+		case 'U':
+			fmt.Println("消息类型为U")
+			readBuf.idx++
+			if aR.u, err = parseUpdateResult(readBuf); err != nil {
+				return nil, err
+			}
+			aR.rt = 'U'
+			return &aR, err
+
+		case 'D':
+			fmt.Println("消息类型为D")
+			readBuf.idx++
+			readBuf.idx++
+			if aR.d, err = parseDeleteResult(readBuf); err != nil {
+				return nil, err
+			}
+			aR.rt = 'D'
+			return &aR, err
+
+		case 'E':
+			fmt.Println("消息类型为E")
+			readBuf.idx++
+
+			if aR.e, err = parseErrInfo(readBuf); err != nil {
+				return nil, err
+			}
+			pConn.errStr = aR.e.ErrStr
+			aR.rt = 'E'
+			return &aR, err
+
+		case 'W':
+			fmt.Println("消息类型为W")
+			readBuf.idx++
+			if aR.w, err = parseWarnInfo(readBuf); err != nil {
+				return nil, err
+			}
+			aR.rt = 'W'
+			return &aR, err
+
+		case 'M':
+			fmt.Println("消息类型为M")
+			readBuf.idx++
+			if aR.m, err = parseMessage(readBuf); err != nil {
+				return nil, err
+			}
+			aR.rt = 'M'
+			return &aR, err
+
+		default:
+			fmt.Println("消息类型为其他")
+			return nil, errors.New("parseMsg: unknown message type")
+		}
+
+	}
+}
+
+// func handleParsedMsg(params ParseMsgParams) (ParseResult, error) {
+// 	result, err := parseMsg(params)
+// 	if err != nil {
+// 		return nil, err
+// 	}
+
+// 	result.Process()
+// 	return result, nil
+// }

+ 2 - 25
xugu/xugu_prepare.go

@@ -7,6 +7,7 @@ import (
 const RETURN_PREPARE_SELECT = 19665
 
 func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
+
 	fmt.Println("\n ---xuguPrepare")
 	sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
 	//上锁
@@ -28,6 +29,7 @@ func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
 	fmt.Println("Message from server:", string(pConn.readBuff.buf[pConn.readBuff.idx:]))
 
 	fmt.Println("\n ---xuguPrepare end")
+
 	return nil
 }
 
@@ -95,28 +97,3 @@ func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
 // 	var ch byte
 // 	var err error
 // 	ret := XG_OK
-
-// 	sql := fmt.Sprintf("DEALLOCATE %s ", prepareName)
-// 	pConn.mu.Lock()
-// 	SockSendCommand0(pConn, sql)
-// 	boolRet, err := rhRecvChar(pConn, &ch)
-// 	fmt.Println("rhRecvChar => ch ", ch)
-// 	if boolRet || err != nil {
-// 		ret = XG_NET_ERROR
-// 	} else if ch == 'K' {
-// 		pConn.havePrepare = 0
-// 	} else {
-// 		pConn.havePrepare = 0
-// 		var errStre []byte
-// 		ret, err = rhRecvStr(pConn, &errStre)
-// 		if err != nil || ret < 0 {
-// 			pConn.mu.Unlock()
-// 			return ret
-// 		}
-
-// 		rhRecvChar(pConn, &ch)
-// 		ret = XG_ERROR
-// 	}
-// 	pConn.mu.Unlock()
-// 	return 0
-// }

+ 1 - 15
xugu/xugu_rows.go

@@ -63,7 +63,6 @@ func (row *xuguRows) Next(dest []driver.Value) error {
 		case fieldType_BINARY,
 			fieldType_CLOB,
 			fieldType_BLOB:
-			fmt.Println("这是 fieldType_BINARY")
 			dest[j] = row.results.Values[j][row.results.rowIdx].Col_Data
 
 		case fieldType_TIME,
@@ -102,14 +101,7 @@ func (row *xuguRows) Next(dest []driver.Value) error {
 func (row *xuguRows) Columns() []string {
 
 	fmt.Println(">>>>>(self *xuguRows) Columns() ")
-	// if row.results == nil {
-	// 	return nil
-	// }
-	// if row.results.Fields != nil {
-	// 	//return self.results.Fields
-	// }
-
-	//columns := make([]string, int(row.results.Field_Num))
+
 	var columns []string
 
 	for i, v := range row.results.Fields {
@@ -117,12 +109,6 @@ func (row *xuguRows) Columns() []string {
 		fmt.Printf("append(columns, v.FieldName) 个数%d ,v: %s\n", i+1, columns[i])
 	}
 
-	// fmt.Println(" -columns len  ", len(columns))
-
-	// for i, v := range columns {
-
-	// 	fmt.Println("  -columns:  ", i, v)
-	// }
 	return columns
 }
 

+ 4 - 109
xugu/xugu_sock.go

@@ -30,120 +30,15 @@ func xgSockOpenConn(ctx context.Context, pConn *xuguConn) error {
 	return nil
 }
 
-// func xgSockPrepare(pConn *xuguConn, sql string, prepareName *string) (int, error) {
-// 	fmt.Println(">>>>>xgSockPrepare")
-
-// 	if "" == *prepareName {
-// 		ret := 0
-// 		//p_params := &conn.params
-// 		if pConn.havePrepare != 0 {
-// 			ret = xgCmdUnprepare(pConn, pConn.prepareName)
-// 			if ret < 0 {
-// 				return ret, errors.New("XG_ERROR")
-// 			}
-// 			pConn.prepareName = ""
-
-// 			pConn.prepareName = fmt.Sprintf(pConn.prepareName+"STC%d", pConn.prepareNo)
-// 			ret, err := xgCmdPrepare(pConn, sql, &pConn.prepareName)
-// 			if err != nil {
-// 				return ret, err
-// 			}
-// 		}
-// 		return ret, errors.New("xgSockPrepare_ERROR")
-// 	}
-
-// 	ret := 0
-
-// 	//	XGCSParam* p_params= pconn->params;
-// 	old_p := pConn.havePrepare
-// 	//	char* sql=strdup(cmd_sql);
-
-// 	*prepareName = fmt.Sprintf("STC%d", pConn.prepareNo)
-// 	pConn.prepareNo++ // 递增 prepare_no 的值
-
-// 	ret, err := xgCmdPrepare(pConn, sql, &pConn.prepareName)
-// 	if err != nil {
-// 		return 0, err
-// 	}
-// 	pConn.havePrepare = old_p // this keeped by prepare_name
-
-// 	return ret, nil
-
-// }
-
-func parseMsg(readBuf *buffer, pConn *xuguConn) (rs *xuguResult, err error) {
-	result := &xuguResult{
-		affectedRows: 0,
-		insertId:     0,
-	}
-
-	for {
-		char := readBuf.peekChar()
-		fmt.Println("parseMsg 内的 peekChar: ", char, "-", string(char))
-		switch char {
-		case 'K':
-			fmt.Println("消息类型为K")
-			// readBuf.buf = make([]byte, 2048)
-			//readBuf.idx++
-			readBuf.buf = make([]byte, 2048)
-			readBuf.idx = 0
-			return result, nil
-		case '$':
-			readBuf.idx++
-			fmt.Println("消息类型为$")
-			parseFormArgDescri(readBuf)
-		case 'A':
-			readBuf.idx++
-			fmt.Println("消息类型为A")
-			parseSelectResult(readBuf)
-		case 'I':
-			readBuf.idx++
-			fmt.Println("消息类型为I")
-			parseInsertResult(readBuf)
-		case 'U':
-			fmt.Println("消息类型为U")
-			readBuf.idx++
-			parseUpdateResult(readBuf)
-		case 'D':
-			fmt.Println("消息类型为D")
-			readBuf.idx++
-			parseDeleteResult(readBuf)
-		case 'E':
-			fmt.Println("消息类型为E")
-			readBuf.idx++
-			pErr, err := parseErrInfo(readBuf)
-			if err != nil {
-				return nil, err
-			}
-
-			// fmt.Println("E ReadBuf : ", readBuf.buf[readBuf.idx:])
-			// fmt.Println("E ReadBuf string: ", string(readBuf.buf[readBuf.idx:]))
-			pConn.errStr = pErr.ErrStr
-			// char := readBuf.peekChar()
-			// fmt.Println("Exec 内的 peekChar: ", char)
-		case 'W':
-			fmt.Println("消息类型为W")
-			readBuf.idx++
-			parseWarnInfo(readBuf)
-		case 'M':
-			fmt.Println("消息类型为M")
-			readBuf.idx++
-			parseMessage(readBuf)
-		default:
-			fmt.Println("消息类型为其他")
-			return nil, errors.New("parseMsg: unknown message type")
-		}
-
-	}
-}
-
-func xuguSockRecvMsg(pConn *xuguConn) (rs *xuguResult, err error) {
+func xuguSockRecvMsg(pConn *xuguConn) (interface{}, error) {
 	pConn.conn.Read(pConn.readBuff.buf)
 	fmt.Println("pConn.readBuff.buf ::", pConn.readBuff.buf, string(pConn.readBuff.buf))
-	rs, err = parseMsg(&pConn.readBuff, pConn)
+
+	rs, err := parseMsg(&pConn.readBuff, pConn)
 	if err != nil {
 		fmt.Println("xuguPrepare parseMsg(&pConn.readBuff, pConn)")
 		return nil, err
 	}
+	pConn.readBuff.reset()
 	return rs, nil
 }

+ 21 - 6
xugu/xugu_stmt.go

@@ -56,7 +56,6 @@ func (stmt *xuguStmt) NumInput() int {
 	parser := &xuguParse{
 		bind_type:   0,
 		param_count: 0,
-		position:    0,
 	}
 
 	fmt.Println("stmt.mysql: ", stmt.mysql)
@@ -93,13 +92,28 @@ func (stmt *xuguStmt) Exec(args []driver.Value) (driver.Result, error) {
 	fmt.Println("Message from server EXEC:", readBuf.buf[readBuf.idx:])
 	fmt.Println("Message from server EXEC:", string(stmt.stmt_conn.readBuff.buf))
 
-	//循环读,多条语句执行,会有多条语句返回
-	//读取一字节
-	rs, err := parseMsg(readBuf, stmt.stmt_conn)
+	aR, err := parseMsg(readBuf, stmt.stmt_conn)
 	if err != nil {
 		return nil, err
 	}
-	return rs, nil
+	switch aR.rt {
+	case insertResult:
+		fmt.Println("case insertResult:")
+		return &xuguResult{
+			affectedRows: int64(0),
+			insertId:     int64(0),
+		}, nil
+
+	case errInfo:
+		return nil, errors.New(string(aR.e.ErrStr))
+	case warnInfo:
+		return nil, errors.New(string(aR.w.WarnStr))
+	default:
+		return &xuguResult{
+			affectedRows: int64(0),
+			insertId:     int64(0),
+		}, nil
+	}
 
 	// for {
 	// 	char := readBuf.peekChar()
@@ -163,7 +177,6 @@ func (stmt *xuguStmt) Query(args []driver.Value) (driver.Rows, error) {
 	parser := &xuguParse{
 		bind_type:   0,
 		param_count: 0,
-		position:    0,
 	}
 
 	//参数绑定
@@ -206,6 +219,7 @@ func (stmt *xuguStmt) Query(args []driver.Value) (driver.Rows, error) {
 	// 	return nil, errors.New("The number of parameters does not match")
 	// }
 
+	stmt.stmt_conn.mu.Lock()
 	//最终发送消息给服务器
 	sockSendPutStatement(stmt.stmt_conn, []byte(stmt.mysql), nil, 0)
 	XGC_Execute(stmt.stmt_conn)
@@ -229,5 +243,6 @@ func (stmt *xuguStmt) Query(args []driver.Value) (driver.Rows, error) {
 		rows_conn: stmt.stmt_conn,
 	}
 	fmt.Println("\n>>>>>>stmt Query  end==============================================================")
+	stmt.stmt_conn.mu.Unlock()
 	return a, nil
 }

+ 0 - 11
xugu/xugu_utils.go

@@ -211,17 +211,6 @@ func getSQLCType(typeID uint32) int32 {
 	}
 }
 
-func needCopyV(xgtype int) bool {
-	switch xgtype {
-	case XG_C_DOUBLE, XG_C_FLOAT, DATETIME_ASLONG, XG_C_BIGINT, XG_C_INTEGER, XG_C_SHORT, XG_C_TINYINT, XG_C_CHARN1, XG_C_BOOL, XG_C_NUMERIC:
-		return true
-
-	default:
-		return false
-	}
-
-}
-
 // 整形转换成字节
 func IntToBytes(n uint32, b byte) ([]byte, error) {
 	switch b {