GTong před 10 měsíci
rodič
revize
f8aedda1c7
6 změnil soubory, kde provedl 292 přidání a 55 odebrání
  1. 49 12
      main.go
  2. 133 10
      xugu/xugu_conn.go
  3. 7 6
      xugu/xugu_parse.go
  4. 11 20
      xugu/xugu_sock.go
  5. 51 7
      xugu/xugu_stmt.go
  6. 41 0
      xugu/xugu_tranx.go

+ 49 - 12
main.go

@@ -7,49 +7,86 @@ import (
 )
 
 func main() {
-	db, err := sql.Open("xugusql", "IP=127.0.0.1;DB=SYSTEM;User=SYSDBA;PWD=SYSDBA;Port=5138;AUTO_COMMIT=on;CHAR_SET=UTF8")
+	db, err := sql.Open("xugusql", "IP=10.28.20.101;DB=SYSTEM;User=SYSDBA;PWD=SYSDBA;Port=5190;AUTO_COMMIT=on;CHAR_SET=UTF8")
 	if err != nil {
 		log.Fatal("err ", err)
 	}
 	// if err := db.Ping(); err != nil {
 	// 	log.Fatal("err ping ", err)
 	// }
-	//db.Query("select * from dual")
 
-	// rows, err := db.Query("select * from test1 where id = ?;", 2)
+	// rows, err := db.Query("select * from dual")
 	// if err != nil {
 	// 	log.Fatal(err)
 	// }
-
+	// // rows, err := db.Query("select * from test1 where id = ?;", 4)
+	// // if err != nil {
+	// // 	log.Fatal(err)
+	// // }
 	// var cols []string
 	// cols, err = rows.Columns()
 	// if err != nil {
 	// 	log.Fatal(err)
 	// }
-
+	// fmt.Println("cols: ", cols)
 	// pvals := make([]interface{}, len(cols))
 	// for key, _ := range pvals {
 	// 	dest := make([]byte, 216)
 	// 	pvals[key] = &dest
 	// } /* end for */
-
 	// for rows.Next() {
 	// 	err = rows.Scan(pvals...)
 	// 	if err != nil {
 	// 		log.Fatal(err)
 	// 	}
-
 	// 	for _, v := range pvals {
 	// 		fmt.Printf("输出 %s\t", string(*(v.(*[]byte))))
 	// 	}
 	// 	fmt.Printf("\n")
 	// }
+	//  rows.Close()
+
+	// stmt, err := db.Prepare("select * from test1 where id = ?")
+	// if err != nil {
+	// 	log.Fatal(err)
+	// }
+	// stmt.Exec(2)
+	// ret, err := db.Exec("insert into test1 (id) values (9)")
+	// if err != nil {
+	// 	log.Fatal(err)
+	// }
+	// ret.LastInsertId()
 
+	// stmt, err := db.Prepare("select * from test1 where id = ?")
+	// if err != nil {
+	// 	log.Fatal(err)
+	// }
+	// rows, err := stmt.Query(5)
+	// if err != nil {
+	// 	log.Fatal(err)
+	// }
+	// var cols []string
+	// cols, err = rows.Columns()
+	// if err != nil {
+	// 	log.Fatal(err)
+	// }
+	// fmt.Println("cols: ", cols)
+	// pvals := make([]interface{}, len(cols))
+	// for key, _ := range pvals {
+	// 	dest := make([]byte, 216)
+	// 	pvals[key] = &dest
+	// } /* end for */
+	// for rows.Next() {
+	// 	err = rows.Scan(pvals...)
+	// 	if err != nil {
+	// 		log.Fatal(err)
+	// 	}
+	// 	for _, v := range pvals {
+	// 		fmt.Printf("输出 %s\t", string(*(v.(*[]byte))))
+	// 	}
+	// 	fmt.Printf("\n")
+	// }
 	// rows.Close()
 
-	stmt, err := db.Prepare("select * from dual where id = ?")
-	if err != nil {
-		log.Fatal(err)
-	}
-	stmt.Exec("2")
+	db.Exec("set auto_commit on;")
 }

+ 133 - 10
xugu/xugu_conn.go

@@ -70,14 +70,6 @@ func (xgConn *xuguConn) Close() error {
 	return nil
 }
 
-func (xgConn *xuguConn) Exec(sql string,
-	args []driver.Value) (driver.Result, error) {
-	gt("xuguConn.Exec")
-	defer gt("xuguConn.Exec end")
-
-	return nil, nil
-}
-
 func (xgConn *xuguConn) Query(sql string,
 	args []driver.Value) (driver.Rows, error) {
 	gt("xuguConn.Query")
@@ -85,6 +77,7 @@ func (xgConn *xuguConn) Query(sql string,
 
 	xgConn.mu.Lock()
 	defer xgConn.mu.Unlock()
+
 	// 检测sql语句不是查询则报错
 	if switchSQLType(sql) != SQL_SELECT {
 		return nil, errors.New("The executed SQL statement is not a SELECT")
@@ -129,6 +122,7 @@ func (xgConn *xuguConn) Query(sql string,
 
 	switch aR.rt {
 	case selectResult:
+		fmt.Printf("selectResult %#v\n", aR.s)
 		rows := &xuguRows{
 			rows_conn: xgConn,
 			results:   aR.s,
@@ -155,6 +149,7 @@ func (xgConn *xuguConn) Ping(ctx context.Context) error {
 
 	//send
 	xgConn.mu.Lock()
+	defer xgConn.mu.Unlock()
 	sockSendPutStatement(xgConn, []byte("select count(*) from dual;"), nil, 0)
 	sockSendExecute(xgConn)
 
@@ -164,7 +159,7 @@ func (xgConn *xuguConn) Ping(ctx context.Context) error {
 		return err
 	}
 	xgConn.readBuff.reset()
-	xgConn.mu.Unlock()
+
 	return nil
 }
 
@@ -201,7 +196,9 @@ func (xgConn *xuguConn) Prepare(sql string) (driver.Stmt, error) {
 		paramCount: count,
 		mysql:      sql,
 	}
-	stmt.prename = []byte(xgConn.prepareName)
+	stmt.prename = []byte(fmt.Sprintf("? %s", xgConn.prepareName))
+
+	fmt.Println("stmt.prename ", stmt.prename, string(stmt.prename))
 	return stmt, nil
 }
 
@@ -211,6 +208,7 @@ func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
 	prepareName = fmt.Sprintf("%s%d", prepareName, pConn.prepareNo)
 	sqlRet := fmt.Sprintf("PREPARE %s AS %s", prepareName, cmd_sql)
 	pConn.prepareName = prepareName
+	fmt.Println("pConn.prepareName ", pConn.prepareName)
 	pConn.prepareNo++
 	//send msg
 	sockSendPutStatement(pConn, []byte(sqlRet), nil, 0)
@@ -229,3 +227,128 @@ func xuguPrepare(pConn *xuguConn, cmd_sql string, prepareName string) error {
 
 	return nil
 }
+
+func (xgConn *xuguConn) Exec(sql string,
+	args []driver.Value) (driver.Result, error) {
+	gt("xuguConn.Exec")
+	defer gt("xuguConn.Exec end")
+	xgConn.mu.Lock()
+	defer xgConn.mu.Unlock()
+
+	// 检测sql语句是查询则报错
+	if switchSQLType(sql) == SQL_SELECT {
+		return nil, errors.New("The executed SQL statement is  a SELECT")
+	}
+
+	// 有传进来的参数
+	if len(args) != 0 {
+		values := []xuguValue{}
+		//判断类型
+		for _, param := range args {
+			err := assertParamType(param, &values)
+			if err != nil {
+				return nil, err
+			}
+		}
+		//send msg
+		if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
+			return nil, err
+		}
+
+		if err := sockSendExecute(xgConn); err != nil {
+			return nil, err
+		}
+
+	} else {
+
+		//send msg
+		if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
+			return nil, err
+		}
+
+		if err := sockSendExecute(xgConn); err != nil {
+			return nil, err
+		}
+
+	}
+	//recv msg
+	aR, err := xuguSockRecvMsg(xgConn)
+	if err != nil {
+		return nil, err
+	}
+
+	switch aR.rt {
+	case selectResult:
+
+		return nil, errors.New("exec is Query error")
+	case errInfo:
+
+		return nil, errors.New(string(aR.e.ErrStr))
+	case warnInfo:
+
+		return nil, errors.New(string(aR.w.WarnStr))
+	default:
+		return &xuguResult{
+			affectedRows: int64(0),
+			insertId:     int64(0),
+		}, nil
+	}
+
+}
+
+func (xgConn *xuguConn) exec(sql string, args []driver.Value) (driver.Result, error) {
+	// 有传进来的参数
+	if len(args) != 0 {
+		values := []xuguValue{}
+		//判断类型
+		for _, param := range args {
+			err := assertParamType(param, &values)
+			if err != nil {
+				return nil, err
+			}
+		}
+		//send msg
+		if err := sockSendPutStatement(xgConn, []byte(sql), &values, len(args)); err != nil {
+			return nil, err
+		}
+
+		if err := sockSendExecute(xgConn); err != nil {
+			return nil, err
+		}
+
+	} else {
+
+		//send msg
+		if err := sockSendPutStatement(xgConn, []byte(sql), nil, len(args)); err != nil {
+			return nil, err
+		}
+
+		if err := sockSendExecute(xgConn); err != nil {
+			return nil, err
+		}
+
+	}
+
+	//recv msg
+	aR, err := xuguSockRecvMsg(xgConn)
+	if err != nil {
+		return nil, err
+	}
+	switch aR.rt {
+	case selectResult:
+
+		return nil, errors.New("exec is Query error")
+	case errInfo:
+
+		return nil, errors.New(string(aR.e.ErrStr))
+	case warnInfo:
+
+		return nil, errors.New(string(aR.w.WarnStr))
+	default:
+		return &xuguResult{
+			affectedRows: int64(0),
+			insertId:     int64(0),
+		}, nil
+	}
+
+}

+ 7 - 6
xugu/xugu_parse.go

@@ -37,30 +37,31 @@ func assertParamType(dV driver.Value, values *[]xuguValue) error {
 	var dest xuguValue
 	switch srcv := dV.(type) {
 	case int64:
-		S := strconv.FormatInt(srcv, 10)
-		dest.value = []byte(S)
-		dest.valueLength = strings.Count(S, "") - 1
+		buf := make([]byte, 8)
+		binary.BigEndian.PutUint64(buf, uint64(srcv))
+		dest.value = buf
+		dest.valueLength = 8
 		dest.islob = false
 		dest.types = fieldType_I8
 
 	case float32:
 		S := strconv.FormatFloat(float64(srcv), 'f', 6, 64)
 		dest.value = []byte(S)
-		dest.valueLength = strings.Count(S, "") - 1
+		dest.valueLength = 4
 		dest.islob = false
 		dest.types = fieldType_R4
 
 	case float64:
 		S := strconv.FormatFloat(srcv, 'f', 15, 64)
 		dest.value = []byte(S)
-		dest.valueLength = strings.Count(S, "") - 1
+		dest.valueLength = 8
 		dest.islob = false
 		dest.types = fieldType_R8
 
 	case bool:
 		S := strconv.FormatBool(srcv)
 		dest.value = []byte(S)
-		dest.valueLength = strings.Count(S, "") - 1
+		dest.valueLength = 1
 		dest.islob = false
 		dest.types = fieldType_BOOL
 

+ 11 - 20
xugu/xugu_sock.go

@@ -57,18 +57,11 @@ func sockSendPutStatement(pConn *xuguConn, sql []byte, values *[]xuguValue, para
 
 	fmt.Println("Comand_str + 0", pConn.sendBuff.Bytes())
 	// Param_num
-	if paramCount <= 0 {
-		var Param_num [4]byte
-		binary.BigEndian.PutUint32(Param_num[:], 0)
-		pConn.sendBuff.Write(Param_num[:])
-	} else {
-		fmt.Println("paramCount> 0: ", paramCount)
-		var networkBytes [2]byte
-		binary.BigEndian.PutUint16(networkBytes[:], uint16(paramCount))
-		pConn.sendBuff.Write(networkBytes[:])
-		fmt.Println("Param_num", pConn.sendBuff.Bytes())
 
-	}
+	var Param_num [4]byte
+	binary.BigEndian.PutUint32(Param_num[:], uint32(paramCount))
+	pConn.sendBuff.Write(Param_num[:])
+
 	fmt.Println("Param_num ", pConn.sendBuff.Bytes())
 
 	if values != nil {
@@ -115,20 +108,18 @@ func sockSendPutStatement(pConn *xuguConn, sql []byte, values *[]xuguValue, para
 			binary.BigEndian.PutUint16(Param_DType[:], uint16(value.types))
 			pConn.sendBuff.Write(Param_DType[:])
 			fmt.Println("Param_DType sendBuff ", pConn.sendBuff.Bytes())
+
 			//Param_Data_Len 根据DType 修改长度
-			//if value.types
-			var Param_Data_Len [4]byte
-			binary.BigEndian.PutUint32(Param_Data_Len[:], uint32(8))
+			Param_Data_Len := make([]byte, 4)
+			binary.BigEndian.PutUint32(Param_Data_Len[:], uint32(value.valueLength))
 			pConn.sendBuff.Write(Param_Data_Len[:])
 			fmt.Println("Param_Data_Len sendBuff ", pConn.sendBuff.Bytes())
-			//Param_Data 根据DType 修改长度
-			var Param_Data [7]byte
 
-			pConn.sendBuff.Write([]byte(Param_Data[:]))
+			//Param_Data 根据DType 修改长度
+			//Param_Data := make([]byte, value.valueLength)
 			pConn.sendBuff.Write([]byte(value.value))
-			//binary.BigEndian.PutUint32(networkBytes[:], 0)
-			// pConn.sendBuff.Write([]byte{0})
-			// fmt.Println("Param_Data sendBuff ", pConn.sendBuff.Bytes())
+
+			fmt.Println("Param_Data sendBuff ", pConn.sendBuff.Bytes())
 
 		}
 

+ 51 - 7
xugu/xugu_stmt.go

@@ -39,11 +39,6 @@ func (stmt *xuguStmt) Close() error {
 func (stmt *xuguStmt) NumInput() int {
 	fmt.Println("\n>>>>>(stmt *xuguStmt) NumInput()")
 
-	// parser := &xuguParse{
-	// 	bind_type:   0,
-	// 	param_count: 0,
-	// }
-
 	fmt.Println("stmt.mysql: ", stmt.mysql)
 	return assertParamCount(stmt.mysql)
 	//return 0
@@ -79,7 +74,7 @@ func (stmt *xuguStmt) Exec(args []driver.Value) (driver.Result, error) {
 	switch aR.rt {
 	case selectResult:
 
-		return nil, errors.New("select result type is error")
+		return nil, errors.New("exec is Query error")
 	case errInfo:
 
 		return nil, errors.New(string(aR.e.ErrStr))
@@ -96,5 +91,54 @@ func (stmt *xuguStmt) Exec(args []driver.Value) (driver.Result, error) {
 }
 
 func (stmt *xuguStmt) Query(args []driver.Value) (driver.Rows, error) {
-	return nil, nil
+
+	gt("stmt Query")
+	defer gt("stmt Query end")
+
+	stmt.stmt_conn.mu.Lock()
+	defer stmt.stmt_conn.mu.Unlock()
+
+	//send msg
+	//如果有参数
+	if stmt.paramCount > 0 && len(args) > 0 {
+		values := []xuguValue{}
+		for _, param := range args {
+			assertParamType(param, &values)
+			//fmt.Printf("Field Name: %s, Field Type: %s, Field Value: %v\n", v1, v1, v1)
+		}
+		sockSendPutStatement(stmt.stmt_conn, stmt.prename, &values, stmt.paramCount)
+		sockSendExecute(stmt.stmt_conn)
+		//没有参数
+	} else {
+		sockSendPutStatement(stmt.stmt_conn, []byte(stmt.mysql), nil, 0)
+		sockSendExecute(stmt.stmt_conn)
+	}
+
+	//recv msg
+	aR, err := xuguSockRecvMsg(stmt.stmt_conn)
+	if err != nil {
+		return nil, err
+	}
+
+	switch aR.rt {
+	case selectResult:
+		fmt.Printf("selectResult %#v\n", aR.s)
+		rows := &xuguRows{
+			rows_conn: stmt.stmt_conn,
+			results:   aR.s,
+			colIdx:    0,
+			prepared:  false,
+		}
+
+		return rows, nil
+	case errInfo:
+
+		return nil, errors.New(string(aR.e.ErrStr))
+	case warnInfo:
+
+		return nil, errors.New(string(aR.w.WarnStr))
+	default:
+	}
+
+	return nil, errors.New("xugu Query error")
 }

+ 41 - 0
xugu/xugu_tranx.go

@@ -0,0 +1,41 @@
+package xugu
+
+import "errors"
+
+type xuguTx struct {
+	tconn *xuguConn
+}
+
+func (tx *xuguTx) Commit() error {
+	tx.tconn.mu.Lock()
+	defer tx.tconn.mu.Unlock()
+
+	if tx.tconn == nil {
+		return errors.New("Invalid connection")
+	}
+	_, err := tx.tconn.exec("commit;", nil)
+
+	_, err = tx.tconn.exec("set auto_commit on;", nil)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (tx *xuguTx) Rollback() error {
+	tx.tconn.mu.Lock()
+	defer tx.tconn.mu.Unlock()
+	
+	if tx.tconn == nil {
+		return errors.New("Invalid connection")
+	}
+	_, err := tx.tconn.exec("rollback;", nil)
+	if err != nil {
+		return err
+	}
+	_, err = tx.tconn.exec("set auto_commit on;", nil)
+	if err != nil {
+		return err
+	}
+	return err
+}