handler.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. // Package handler 提供数据查询模块的HTTP请求处理器
  2. // 负责处理数据查询、SQL执行等相关API请求,协调前端请求与后端服务层的交互
  3. package handler
  4. import (
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "dbview/service/internal/common/databases/meta"
  9. "dbview/service/internal/common/response"
  10. api "dbview/service/internal/modules/data_query/api"
  11. svc "dbview/service/internal/modules/data_query/service"
  12. "github.com/gin-gonic/gin"
  13. "github.com/gorilla/websocket"
  14. )
  15. // Handler 数据查询处理器结构体
  16. // 封装了数据查询服务层的调用逻辑,处理HTTP请求和响应
  17. type Handler struct {
  18. svc *svc.DataService // 数据查询服务实例
  19. }
  20. // NewHandler 创建新的数据查询处理器
  21. // 参数:
  22. // - s: 数据查询服务实例
  23. //
  24. // 返回:
  25. // - *Handler: 初始化后的处理器实例
  26. func NewHandler(s *svc.DataService) *Handler {
  27. return &Handler{svc: s}
  28. }
  29. // QueryDataHandler 处理数据查询请求 POST /data/query
  30. // 支持同步和异步两种查询模式,根据请求参数决定执行方式
  31. // 请求体:api.QueryDataRequest 包含连接信息、查询路径、查询条件等
  32. // 响应:统一的数据操作响应格式,包含查询结果或异步任务ID
  33. func (h *Handler) QueryDataHandler(c *gin.Context) {
  34. // 解析请求参数,验证必填字段
  35. var req api.QueryDataRequest
  36. if err := c.ShouldBindJSON(&req); err != nil {
  37. // 参数绑定失败,返回客户端错误
  38. c.JSON(http.StatusBadRequest, response.Response{Code: 1, Msg: "请求参数错误", Error: err.Error()})
  39. return
  40. }
  41. // 调用服务层执行数据查询操作
  42. // 支持根据路径定位数据库对象,并应用查询条件进行数据筛选
  43. asyncMode := req.Async.Mode
  44. isAsync := asyncMode != api.AsyncModeSync
  45. // 将前端对大对象字段的控制传递到服务层
  46. includeLarge := req.IncludeLarge
  47. result, err := h.svc.QueryData(c.Request.Context(), req.ConnID, req.ConnInfo, req.Path, req.Query, isAsync, includeLarge)
  48. if err != nil {
  49. // 服务层调用失败,返回服务器内部错误
  50. c.JSON(http.StatusInternalServerError, response.Response{Code: 2, Msg: "查询数据失败", Error: err.Error()})
  51. return
  52. }
  53. // 根据查询模式处理不同的返回结果
  54. // 异步模式返回任务ID,同步模式返回查询结果
  55. if taskId, ok := result.(string); ok {
  56. // 异步查询:任务已提交到后台队列,返回任务ID供后续查询状态
  57. asyncResp := api.AsyncQueryResponse{
  58. Success: true,
  59. TaskID: taskId,
  60. Message: "查询已提交到任务队列",
  61. }
  62. // 如果客户端以 WebSocket 模式请求异步,则返回可用的 ws 链接
  63. if req.Async.Mode == api.AsyncModeWS {
  64. scheme := "ws"
  65. if c.Request.TLS != nil {
  66. scheme = "wss"
  67. }
  68. asyncResp.WsURL = fmt.Sprintf("%s://%s/database/data/tasks/ws", scheme, c.Request.Host)
  69. }
  70. c.JSON(http.StatusOK, response.Response{Code: 0, Msg: "ok", Data: asyncResp})
  71. return
  72. }
  73. // 同步查询:直接返回查询结果数据
  74. if queryResult, ok := result.(meta.QueryResult); ok {
  75. // 构建统一的数据操作响应,包含查询结果、执行时间和成功状态
  76. resp := api.DataOperationResponse{
  77. Success: true,
  78. Data: queryResult,
  79. Message: "查询完成",
  80. }
  81. c.JSON(http.StatusOK, response.Response{Code: 0, Msg: "ok", Data: resp})
  82. return
  83. }
  84. // 理论上不会到达这里,但为了代码健壮性保留错误处理
  85. c.JSON(http.StatusInternalServerError, response.Response{Code: 2, Msg: "未知的返回类型", Error: "service returned unexpected type"})
  86. }
  87. // GetTaskResultHandler 处理异步查询任务结果获取请求 GET /data/task/result
  88. // 用于查询异步数据查询任务的执行状态和结果
  89. // 当任务完成时返回查询结果数据,当任务进行中时只返回状态信息
  90. // 请求体:api.GetTaskResultRequest 包含任务ID
  91. // 响应:包含任务状态和可能的查询结果数据
  92. func (h *Handler) GetTaskResultHandler(c *gin.Context) {
  93. // 解析请求参数,获取要查询的任务ID
  94. var req api.GetTaskResultRequest
  95. if err := c.ShouldBindJSON(&req); err != nil {
  96. // 参数验证失败,返回客户端错误
  97. c.JSON(http.StatusBadRequest, response.Response{Code: 1, Msg: "请求参数错误", Error: err.Error()})
  98. return
  99. }
  100. // 调用服务层查询指定任务的执行结果和状态
  101. // 返回查询结果数据、任务状态和可能的错误信息
  102. result, status, err := h.svc.GetTaskResult(c.Request.Context(), req.TaskID)
  103. if err != nil {
  104. // 获取任务结果失败,返回服务器内部错误
  105. c.JSON(http.StatusInternalServerError, response.Response{Code: 2, Msg: "获取任务结果失败", Error: err.Error()})
  106. return
  107. }
  108. // 构建基础响应结构,包含任务状态信息
  109. resp := api.GetTaskResultResponse{
  110. Success: true,
  111. Status: status,
  112. Message: "获取任务结果成功",
  113. }
  114. // 如果任务已完成,填充查询结果数据
  115. // 只有在任务完成状态下才有实际的查询结果
  116. if status == "completed" {
  117. resp.Data = api.DataOperationResponse{
  118. Success: true,
  119. Data: result,
  120. Message: "查询完成",
  121. }
  122. }
  123. // 返回任务状态和可能的查询结果
  124. c.JSON(http.StatusOK, response.Response{Code: 0, Msg: "ok", Data: resp})
  125. }
  126. // TaskWSHandler 处理 WebSocket 连接,用于订阅任务的实时推送
  127. // 客户端需要先建立 websocket 连接,然后发送 JSON 消息订阅/退订任务:
  128. // {"action":"subscribe","taskId":"..."} / {"action":"unsubscribe","taskId":"..."}
  129. func (h *Handler) TaskWSHandler(c *gin.Context) {
  130. upgrader := websocket.Upgrader{
  131. CheckOrigin: func(r *http.Request) bool { return true },
  132. }
  133. conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
  134. if err != nil {
  135. return
  136. }
  137. // Register connection with manager to start pumps and receive inbound messages
  138. inbound, err := h.svc.RegisterWSConn(conn)
  139. if err != nil {
  140. _ = conn.Close()
  141. return
  142. }
  143. subscribed := make(map[string]struct{})
  144. // 监听 inbound 通道,处理客户端订阅/退订消息
  145. for msgB := range inbound {
  146. var msg struct {
  147. Action string `json:"action"`
  148. TaskID string `json:"taskId"`
  149. }
  150. if err := json.Unmarshal(msgB, &msg); err != nil {
  151. // 忽略解析错误
  152. continue
  153. }
  154. switch msg.Action {
  155. case "subscribe":
  156. if msg.TaskID == "" {
  157. continue
  158. }
  159. if err := h.svc.SubscribeTaskWS(conn, msg.TaskID); err == nil {
  160. subscribed[msg.TaskID] = struct{}{}
  161. _ = h.svc.SendWSMessage(conn, map[string]interface{}{"taskId": msg.TaskID, "action": "subscribed"})
  162. }
  163. case "unsubscribe":
  164. if msg.TaskID == "" {
  165. continue
  166. }
  167. _ = h.svc.UnsubscribeTaskWS(conn, msg.TaskID)
  168. delete(subscribed, msg.TaskID)
  169. _ = h.svc.SendWSMessage(conn, map[string]interface{}{"taskId": msg.TaskID, "action": "unsubscribed"})
  170. }
  171. }
  172. // inbound closed => connection closed; cleanup subscriptions
  173. for tid := range subscribed {
  174. _ = h.svc.UnsubscribeTaskWS(conn, tid)
  175. }
  176. }
  177. // ExecuteSQLHandler 处理SQL执行请求 POST /data/execute
  178. // 支持执行各种类型的SQL语句(SELECT、INSERT、UPDATE、DELETE、CREATE、DROP等)
  179. // 支持同步和异步两种执行模式,根据SQL复杂度和数据量自动选择或由客户端指定
  180. // 请求体:api.ExecuteSQLRequest 包含连接信息、SQL语句、参数等
  181. // 响应:统一的执行结果,包含执行状态、影响行数、可能的查询结果等信息
  182. func (h *Handler) ExecuteSQLHandler(c *gin.Context) {
  183. // 解析请求参数,验证SQL语句和连接信息
  184. var req api.ExecuteSQLRequest
  185. if err := c.ShouldBindJSON(&req); err != nil {
  186. // 参数绑定失败,返回客户端错误
  187. c.JSON(http.StatusBadRequest, response.Response{Code: 1, Msg: "请求参数错误", Error: err.Error()})
  188. return
  189. }
  190. // 调用服务层执行SQL语句
  191. // 支持参数化查询防止SQL注入,根据路径定位数据库对象
  192. isAsyncSQL := req.Async.Mode != api.AsyncModeSync
  193. result, err := h.svc.ExecuteSQL(c.Request.Context(), req.ConnID, req.ConnInfo, req.Path, req.SQL, req.Params, isAsyncSQL, req.IncludeLarge)
  194. if err != nil {
  195. // SQL执行失败,返回服务器内部错误
  196. c.JSON(http.StatusInternalServerError, response.Response{Code: 2, Msg: "执行SQL失败", Error: err.Error()})
  197. return
  198. }
  199. // 根据执行模式处理不同的返回结果
  200. // 异步模式返回任务ID,同步模式返回执行结果
  201. if taskId, ok := result.(string); ok {
  202. // 异步执行:SQL已提交到后台队列,返回任务ID供后续查询执行状态
  203. asyncResp := api.AsyncExecuteResponse{
  204. Success: true,
  205. TaskID: taskId,
  206. Message: "SQL已提交到任务队列",
  207. }
  208. if req.Async.Mode == api.AsyncModeWS {
  209. scheme := "ws"
  210. if c.Request.TLS != nil {
  211. scheme = "wss"
  212. }
  213. asyncResp.WsURL = fmt.Sprintf("%s://%s/database/data/tasks/ws", scheme, c.Request.Host)
  214. }
  215. c.JSON(http.StatusOK, response.Response{Code: 0, Msg: "ok", Data: asyncResp})
  216. return
  217. }
  218. // 同步执行:直接返回SQL执行结果
  219. if executeResult, ok := result.(meta.ExecuteResult); ok {
  220. // 将内部的ExecuteResult转换为统一的DataOperationResponse格式
  221. // 便于前端统一处理不同类型的数据库操作结果
  222. resp := api.DataOperationResponse{
  223. Success: executeResult.Success,
  224. SQLType: executeResult.SQLType, // SQL语句类型(SELECT/INSERT/UPDATE等)
  225. AffectedRows: executeResult.AffectedRows, // 受影响的行数
  226. LastInsertID: executeResult.LastInsertID, // 自增主键值(INSERT语句)
  227. ErrorMessage: executeResult.ErrorMessage, // 执行错误信息
  228. ExecutionTime: executeResult.ExecutionTime, // 执行耗时(毫秒)
  229. RowsReturned: executeResult.RowsReturned, // 返回的行数(SELECT语句)
  230. Message: "SQL执行完成",
  231. }
  232. // 只有SELECT语句才包含查询结果数据,其他语句(如INSERT/UPDATE)没有Data字段
  233. if executeResult.Data != nil {
  234. resp.Data = *executeResult.Data
  235. }
  236. c.JSON(http.StatusOK, response.Response{Code: 0, Msg: "ok", Data: resp})
  237. return
  238. }
  239. // 理论上不会到达这里,但为了代码健壮性保留错误处理
  240. c.JSON(http.StatusInternalServerError, response.Response{Code: 2, Msg: "未知的返回类型", Error: "service returned unexpected type"})
  241. }
  242. // GetExecuteTaskResultHandler 处理异步SQL执行任务结果获取请求 GET /data/execute/task/result
  243. // 用于查询异步SQL执行任务的执行状态和结果
  244. // 当任务完成时返回SQL执行结果(包括影响行数、可能的查询数据等)
  245. // 当任务失败时返回错误信息,当任务进行中时只返回状态信息
  246. // 请求体:api.GetExecuteTaskResultRequest 包含任务ID
  247. // 响应:包含任务状态和可能的SQL执行结果数据
  248. func (h *Handler) GetExecuteTaskResultHandler(c *gin.Context) {
  249. // 解析请求参数,获取要查询的任务ID
  250. var req api.GetExecuteTaskResultRequest
  251. if err := c.ShouldBindJSON(&req); err != nil {
  252. // 参数验证失败,返回客户端错误
  253. c.JSON(http.StatusBadRequest, response.Response{Code: 1, Msg: "请求参数错误", Error: err.Error()})
  254. return
  255. }
  256. // 调用服务层查询指定SQL执行任务的结果和状态
  257. // 返回执行结果数据、任务状态和可能的错误信息
  258. result, status, err := h.svc.GetExecuteTaskResult(c.Request.Context(), req.TaskID)
  259. if err != nil {
  260. // 获取任务结果失败,返回服务器内部错误
  261. c.JSON(http.StatusInternalServerError, response.Response{Code: 2, Msg: "获取任务结果失败", Error: err.Error()})
  262. return
  263. }
  264. // 构建基础响应结构,包含任务状态信息
  265. resp := api.GetExecuteTaskResultResponse{
  266. Success: true,
  267. Status: status,
  268. Message: "获取任务结果成功",
  269. }
  270. // 如果任务已完成,填充SQL执行结果数据
  271. // 包含执行成功状态、影响行数、执行时间等详细信息
  272. if status == "completed" {
  273. // 将内部的ExecuteResult转换为统一的DataOperationResponse格式
  274. // 便于前端统一处理不同类型的数据库操作结果
  275. resp.Data = api.DataOperationResponse{
  276. Success: result.Success, // SQL执行是否成功
  277. SQLType: result.SQLType, // SQL语句类型(SELECT/INSERT/UPDATE等)
  278. AffectedRows: result.AffectedRows, // 受影响的行数
  279. LastInsertID: result.LastInsertID, // 自增主键值(INSERT语句)
  280. ErrorMessage: result.ErrorMessage, // 执行错误信息
  281. ExecutionTime: result.ExecutionTime, // 执行耗时(毫秒)
  282. RowsReturned: result.RowsReturned, // 返回的行数(SELECT语句)
  283. }
  284. // 只有SELECT语句才包含查询结果数据,其他语句(如INSERT/UPDATE)没有Data字段
  285. if result.Data != nil {
  286. resp.Data.Data = *result.Data
  287. }
  288. }
  289. // 返回任务状态和可能的SQL执行结果
  290. c.JSON(http.StatusOK, response.Response{Code: 0, Msg: "ok", Data: resp})
  291. }