collector.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. package collector
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "database/sql"
  7. "errors"
  8. "fmt"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/go-kit/log"
  16. "github.com/go-kit/log/level"
  17. "github.com/prometheus/client_golang/prometheus"
  18. )
  19. // Exporter 收集 xugu 数据库的指标信息。它实现了 prometheus.Collector 接口。
  20. type Exporter struct {
  21. config *Config // 配置项
  22. mu *sync.Mutex // 互斥锁,用于保护并发访问
  23. metricsToScrape Metrics // 需要抓取的指标
  24. scrapeInterval *time.Duration // 抓取间隔
  25. dsn string // 数据库连接字符串
  26. duration, error prometheus.Gauge // prometheus Gauge,用于记录抓取持续时间和错误状态
  27. totalScrapes prometheus.Counter // prometheus Counter,记录总抓取次数
  28. scrapeErrors *prometheus.CounterVec // prometheus CounterVec,记录抓取错误的详细信息
  29. scrapeResults []prometheus.Metric // 抓取结果存储
  30. up prometheus.Gauge // prometheus Gauge,表示数据库是否可用
  31. db *sql.DB // 数据库连接对象
  32. logger log.Logger // 日志记录器
  33. }
  34. // Config 定义 Exporter 的配置
  35. type Config struct {
  36. DSN string // 数据库连接字符串
  37. MaxIdleConns int // 最大空闲连接数
  38. MaxOpenConns int // 最大打开连接数
  39. CustomMetrics string // 自定义指标配置
  40. QueryTimeout int // 查询超时时间(秒)
  41. DefaultMetricsFile string // 默认指标文件路径
  42. }
  43. // CreateDefaultConfig 返回 Exporter 的默认配置
  44. // 注意:返回的配置中 DSN 是空的
  45. func CreateDefaultConfig() *Config {
  46. return &Config{
  47. MaxIdleConns: 0, // 默认最大空闲连接数为 0
  48. MaxOpenConns: 10, // 默认最大打开连接数为 10
  49. CustomMetrics: "", // 默认没有自定义指标
  50. QueryTimeout: 5, // 默认查询超时时间为 5 秒
  51. DefaultMetricsFile: "", // 默认没有指定指标文件
  52. }
  53. }
  54. // Metric 描述一个指标对象
  55. type Metric struct {
  56. Context string // 指标上下文(例如查询的数据库信息)
  57. Labels []string // 指标标签
  58. MetricsDesc map[string]string // 指标描述
  59. MetricsType map[string]string // 指标类型(如 Gauge 或 Counter)
  60. MetricsBuckets map[string]map[string]string // 指标的直方图桶信息
  61. FieldToAppend string // 需要附加的字段
  62. Request string // 指标抓取的 SQL 查询
  63. IgnoreZeroResult bool // 是否忽略结果为零的情况
  64. }
  65. // Metrics 是 prometheus 指标的容器结构
  66. type Metrics struct {
  67. Metric []Metric `json:"metrics"` // 包含多个指标对象
  68. }
  69. var (
  70. additionalMetrics Metrics // 附加的指标
  71. hashMap = make(map[int][]byte) // 用于存储额外信息的哈希表
  72. namespace = "xugudb" // 指标命名空间
  73. exporterName = "exporter" // 导出器名称
  74. )
  75. // maskDsn 用于屏蔽数据库连接字符串中的敏感信息
  76. func maskDsn(dsn string) string {
  77. parts := strings.Split(dsn, "@") // 将连接字符串按 "@" 分割
  78. if len(parts) > 1 {
  79. maskedURL := "***@" + parts[1] // 将用户名部分替换为 "***"
  80. return maskedURL
  81. }
  82. return dsn
  83. }
  84. // NewExporter 创建一个新的 Exporter 实例
  85. func NewExporter(logger log.Logger, cfg *Config) (*Exporter, error) {
  86. e := &Exporter{
  87. mu: &sync.Mutex{}, // 初始化互斥锁
  88. dsn: cfg.DSN, // 数据库连接字符串
  89. // 定义指标:上次抓取持续时间
  90. duration: prometheus.NewGauge(prometheus.GaugeOpts{
  91. Namespace: namespace,
  92. Subsystem: exporterName,
  93. Name: "last_scrape_duration_seconds",
  94. Help: "上次从 xugu 数据库抓取指标的持续时间(秒)。",
  95. }),
  96. // 定义指标:抓取总次数
  97. totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
  98. Namespace: namespace,
  99. Subsystem: exporterName,
  100. Name: "scrapes_total",
  101. Help: "从 xugu 数据库抓取指标的总次数。",
  102. }),
  103. // 定义指标:抓取错误总数(分 collector 分类)
  104. scrapeErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
  105. Namespace: namespace,
  106. Subsystem: exporterName,
  107. Name: "scrape_errors_total",
  108. Help: "从 xugu 数据库抓取指标时发生错误的总次数。",
  109. }, []string{"collector"}),
  110. // 定义指标:上次抓取是否出错(1 表示出错,0 表示成功)
  111. error: prometheus.NewGauge(prometheus.GaugeOpts{
  112. Namespace: namespace,
  113. Subsystem: exporterName,
  114. Name: "last_scrape_error",
  115. Help: "上次从 xugu 数据库抓取指标是否出错(1 表示出错,0 表示成功)。",
  116. }),
  117. // 定义指标:数据库是否可用
  118. up: prometheus.NewGauge(prometheus.GaugeOpts{
  119. Namespace: namespace,
  120. Name: "up",
  121. Help: "xugu 数据库服务器是否可用。",
  122. }),
  123. logger: logger, // 日志记录器
  124. config: cfg, // 配置信息
  125. }
  126. // 设置要抓取的默认指标
  127. e.metricsToScrape = e.DefaultMetrics()
  128. // 尝试连接数据库
  129. err := e.connect()
  130. return e, err
  131. }
  132. // connect 方法用于建立与 xugu 数据库的连接并进行配置。
  133. // 它将 DSN(Data Source Name)解析、连接数据库、配置连接池等。
  134. // 返回:
  135. // 如果连接成功,返回 nil。
  136. // 如果连接失败,返回相应的错误。
  137. func (e *Exporter) connect() error {
  138. // 解析 DSN (数据源名称),检查格式是否正确
  139. _, err := url.Parse(e.dsn)
  140. if err != nil {
  141. // 如果 DSN 格式错误,记录日志并返回错误
  142. level.Error(e.logger).Log("malformed DSN: ", maskDsn(e.dsn))
  143. return err
  144. }
  145. // 记录连接日志,使用 maskDsn 函数隐藏 DSN 的敏感信息
  146. level.Debug(e.logger).Log("launching connection: ", maskDsn(e.dsn))
  147. // 使用 sql.Open 打开与 xugu 数据库的连接
  148. fmt.Println("连接的虚谷数据库为: ", e.dsn)
  149. db, err := sql.Open("xugu", e.dsn)
  150. if err != nil {
  151. // 如果连接失败,记录日志并返回错误
  152. level.Error(e.logger).Log("error while connecting to", e.dsn)
  153. return err
  154. }
  155. // 设置最大空闲连接数,并记录日志
  156. level.Debug(e.logger).Log("set max idle connections to ", e.config.MaxIdleConns)
  157. db.SetMaxIdleConns(e.config.MaxIdleConns)
  158. // 设置最大打开连接数,并记录日志
  159. level.Debug(e.logger).Log("set max open connections to ", e.config.MaxOpenConns)
  160. db.SetMaxOpenConns(e.config.MaxOpenConns)
  161. // 连接成功,记录日志并将数据库连接对象赋值给 e.db
  162. level.Debug(e.logger).Log("successfully connected to: ", maskDsn(e.dsn))
  163. e.db = db
  164. // 返回 nil 表示连接成功
  165. return nil
  166. }
  167. // RunScheduledScrapes 是为想要设置定时抓取而非每次 Collect 调用时抓取的用户准备的。
  168. // 它会根据给定的时间间隔 (si) 定期从数据库抓取指标
  169. func (e *Exporter) RunScheduledScrapes(ctx context.Context, si time.Duration) {
  170. // 设置抓取的时间间隔
  171. e.scrapeInterval = &si
  172. // 创建一个定时器,每隔指定的时间间隔(si)触发一次
  173. ticker := time.NewTicker(si)
  174. // 确保函数退出时定时器能够停止
  175. defer ticker.Stop()
  176. // 启动一个无限循环来定期抓取指标
  177. for {
  178. select {
  179. // 每当定时器到达时间间隔时执行抓取操作
  180. case <-ticker.C:
  181. e.mu.Lock() // 获取锁,确保不会发生并发的抓取操作
  182. e.scheduledScrape() // 调用实际的抓取函数
  183. e.mu.Unlock() // 释放锁
  184. // 如果上下文被取消(通常是服务器关闭或者停止信号),退出循环
  185. case <-ctx.Done():
  186. return
  187. }
  188. }
  189. }
  190. func (e *Exporter) scheduledScrape() {
  191. // 创建一个用于传递指标的 channel,缓冲区大小为 5
  192. metricCh := make(chan prometheus.Metric, 5)
  193. // 创建一个 WaitGroup,确保所有指标收集完成后再返回
  194. wg := &sync.WaitGroup{}
  195. wg.Add(1)
  196. // 启动一个 goroutine 来处理收集到的指标
  197. go func() {
  198. // 当 goroutine 完成时减少 WaitGroup 的计数
  199. defer wg.Done()
  200. // 初始化 scrapeResults 列表,用于存储所有抓取到的指标
  201. e.scrapeResults = []prometheus.Metric{}
  202. // 不断从 metricCh 获取指标并添加到 scrapeResults
  203. for {
  204. scrapeResult, more := <-metricCh
  205. if more {
  206. // 将收到的指标添加到 scrapeResults 中
  207. e.scrapeResults = append(e.scrapeResults, scrapeResult)
  208. continue
  209. }
  210. // 当 channel 被关闭时,退出循环
  211. return
  212. }
  213. }()
  214. // 调用 scrape 方法抓取数据库指标,将结果发送到 metricCh
  215. e.scrape(metricCh)
  216. // 报告元数据指标
  217. metricCh <- e.duration // 记录持续时间指标
  218. metricCh <- e.totalScrapes // 记录总抓取次数指标
  219. metricCh <- e.error // 记录错误指标
  220. e.scrapeErrors.Collect(metricCh) // 收集并发送错误相关的指标
  221. // 发送 up 指标,表示抓取是否成功
  222. metricCh <- e.up
  223. // 关闭指标 channel,通知 goroutine 停止处理指标
  224. close(metricCh)
  225. // 等待 goroutine 完成,确保所有指标都已处理
  226. wg.Wait()
  227. }
  228. // Describe 描述由 数据库导出器导出的所有指标
  229. func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
  230. // 由于无法预先知道导出器会生成哪些指标,
  231. // 使用一种“简易描述方法”:执行一次 Collect 方法
  232. // 并将收集到的指标描述符发送到描述通道。
  233. // 问题在于需要连接 数据库,如果数据库当前不可用,
  234. // 描述符将是不完整的。然而,由于这是一个独立的导出器,
  235. // 而非在其他代码中作为库使用,因此最糟糕的情况是无法检测到
  236. // 导出器本身创建的不一致指标。此外,受监控的 数据库 实例
  237. // 发生变化可能会在运行时更改导出的指标。
  238. metricCh := make(chan prometheus.Metric) // 用于接收指标的通道
  239. doneCh := make(chan struct{}) // 用于通知完成的通道
  240. // 启动一个 goroutine,将指标描述符发送到描述通道
  241. go func() {
  242. for m := range metricCh {
  243. ch <- m.Desc() // 提取指标的描述符
  244. }
  245. close(doneCh) // 完成后关闭 doneCh
  246. }()
  247. e.Collect(metricCh) // 调用 Collect 方法收集指标
  248. close(metricCh) // 关闭指标通道
  249. <-doneCh // 等待完成通知
  250. }
  251. // Collect 实现了 prometheus.Collector 接口,用于收集指标数据
  252. func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
  253. // 如果有设置抓取间隔(scrapeInterval),且抓取间隔不为 0
  254. if e.scrapeInterval != nil && *e.scrapeInterval != 0 {
  255. // 确保线程安全,进行只读访问
  256. e.mu.Lock()
  257. for _, r := range e.scrapeResults { // 将之前的抓取结果发送到指标通道
  258. ch <- r
  259. }
  260. e.mu.Unlock()
  261. return
  262. }
  263. // 如果没有抓取间隔,或者间隔为 0,则按请求进行抓取
  264. e.mu.Lock() // 加锁以确保没有并发抓取操作
  265. defer e.mu.Unlock() // 解锁防止资源泄漏
  266. // 执行抓取操作
  267. e.scrape(ch)
  268. // 发送基础指标到通道
  269. ch <- e.duration // 上次抓取持续时间
  270. ch <- e.totalScrapes // 总抓取次数
  271. ch <- e.error // 上次抓取是否出错
  272. e.scrapeErrors.Collect(ch) // 抓取错误的详细统计
  273. ch <- e.up // 数据库是否可用
  274. }
  275. func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
  276. e.totalScrapes.Inc() // 增加总抓取次数计数器
  277. var err error
  278. var errmutex sync.Mutex
  279. // 延迟执行,用于记录抓取持续时间和错误状态
  280. defer func(begun time.Time) {
  281. e.duration.Set(time.Since(begun).Seconds()) // 设置持续时间指标
  282. if err == nil {
  283. e.error.Set(0) // 如果没有错误,设置错误指标为 0
  284. } else {
  285. e.error.Set(1) // 如果有错误,设置错误指标为 1
  286. }
  287. }(time.Now())
  288. // 检查数据库连接是否正常
  289. if err = e.db.Ping(); err != nil {
  290. if strings.Contains(err.Error(), "sql: database is closed") {
  291. level.Info(e.logger).Log("Reconnecting to DB") // 记录重新连接数据库的信息
  292. err = e.connect()
  293. if err != nil {
  294. level.Error(e.logger).Log("error reconnecting to DB", err.Error())
  295. }
  296. }
  297. }
  298. // 再次检查数据库连接
  299. if err = e.db.Ping(); err != nil {
  300. level.Error(e.logger).Log("error pinging xugu:", err.Error())
  301. e.up.Set(0) // 设置数据库不可用指标为 0
  302. return
  303. }
  304. level.Debug(e.logger).Log("Successfully pinged xugu database: ", maskDsn(e.dsn))
  305. e.up.Set(1) // 设置数据库可用指标为 1
  306. // 检查指标配置是否有变化
  307. if e.checkIfMetricsChanged() {
  308. e.reloadMetrics() // 重新加载指标配置
  309. }
  310. wg := sync.WaitGroup{} // 定义一个 WaitGroup,用于等待所有抓取协程完成
  311. // 遍历需要抓取的指标
  312. for _, metric := range e.metricsToScrape.Metric {
  313. wg.Add(1)
  314. metric := metric // 为闭包创建新的局部变量(避免闭包捕获循环变量的问题)
  315. f := func() {
  316. defer wg.Done() // 在函数结束时,通知 WaitGroup 完成
  317. // 输出调试信息,记录当前正在抓取的指标信息
  318. level.Debug(e.logger).Log("About to scrape metric: ")
  319. level.Debug(e.logger).Log("- Metric MetricsDesc: ", fmt.Sprintf("%+v", metric.MetricsDesc))
  320. level.Debug(e.logger).Log("- Metric Context: ", metric.Context)
  321. level.Debug(e.logger).Log("- Metric MetricsType: ", fmt.Sprintf("%+v", metric.MetricsType))
  322. level.Debug(e.logger).Log("- Metric MetricsBuckets: ", fmt.Sprintf("%+v", metric.MetricsBuckets), "(Ignored unless Histogram type)")
  323. level.Debug(e.logger).Log("- Metric Labels: ", fmt.Sprintf("%+v", metric.Labels))
  324. level.Debug(e.logger).Log("- Metric FieldToAppend: ", metric.FieldToAppend)
  325. level.Debug(e.logger).Log("- Metric IgnoreZeroResult: ", fmt.Sprintf("%+v", metric.IgnoreZeroResult))
  326. level.Debug(e.logger).Log("- Metric Request: ", metric.Request)
  327. // 检查请求是否为空
  328. if len(metric.Request) == 0 {
  329. level.Error(e.logger).Log("Error scraping for ", metric.MetricsDesc, ". Did you forget to define request in your metrics config file?")
  330. return
  331. }
  332. // 检查指标描述是否为空
  333. if len(metric.MetricsDesc) == 0 {
  334. level.Error(e.logger).Log("Error scraping for query", metric.Request, ". Did you forget to define metricsdesc in your metrics config file?")
  335. return
  336. }
  337. // 检查直方图类型的指标是否定义了桶(Buckets)
  338. for column, metricType := range metric.MetricsType {
  339. if metricType == "histogram" {
  340. _, ok := metric.MetricsBuckets[column]
  341. if !ok {
  342. level.Error(e.logger).Log("Unable to find MetricsBuckets configuration key for metric. (metric=" + column + ")")
  343. return
  344. }
  345. }
  346. }
  347. scrapeStart := time.Now() // 记录抓取开始时间
  348. // 执行实际的指标抓取操作
  349. if err1 := e.ScrapeMetric(e.db, ch, metric); err1 != nil {
  350. // 如果抓取过程中发生错误,记录错误信息并更新错误计数器
  351. errmutex.Lock()
  352. {
  353. err = err1
  354. }
  355. errmutex.Unlock()
  356. level.Error(e.logger).Log("scrapeMetricContext", metric.Context, "ScrapeDuration", time.Since(scrapeStart), "msg", err1.Error())
  357. e.scrapeErrors.WithLabelValues(metric.Context).Inc()
  358. } else {
  359. // 如果抓取成功,记录成功信息
  360. level.Debug(e.logger).Log("successfully scraped metric: ", metric.Context, metric.MetricsDesc, time.Since(scrapeStart))
  361. }
  362. }
  363. go f() // 启动一个协程执行抓取函数
  364. }
  365. wg.Wait() // 等待所有抓取协程完成
  366. }
  367. // ScrapeMetric 是接口方法,调用 scrapeGenericValues 使用 Metric 结构体的值来抓取指标
  368. func (e *Exporter) ScrapeMetric(db *sql.DB, ch chan<- prometheus.Metric, metricDefinition Metric) error {
  369. level.Debug(e.logger).Log("调用函数 ScrapeGenericValues()")
  370. // 调用 scrapeGenericValues 函数来抓取指标
  371. return e.scrapeGenericValues(db, ch, metricDefinition.Context, metricDefinition.Labels,
  372. metricDefinition.MetricsDesc, metricDefinition.MetricsType, metricDefinition.MetricsBuckets,
  373. metricDefinition.FieldToAppend, metricDefinition.IgnoreZeroResult,
  374. metricDefinition.Request)
  375. }
  376. // scrapeGenericValues 是一个通用的抓取指标的方法
  377. // 参数:
  378. // - db: 数据库连接对象,用于执行 SQL 查询。
  379. // - ch: 用于发送抓取到的 Prometheus 指标的通道。
  380. // - context: 指标的上下文,用于标识指标的来源或类型。
  381. // - labels: 指标的标签,用于给指标添加上下文信息。
  382. // - metricsDesc: 存储指标名称及其帮助信息的映射。
  383. // - metricsType: 存储指标类型(例如 gauge 或 histogram)的映射。
  384. // - metricsBuckets: 存储直方图类型指标的桶(buckets)配置。
  385. // - fieldToAppend: 用于动态附加到指标名称中的字段。
  386. // - ignoreZeroResult: 如果为 true,则忽略没有结果的指标,否则会返回错误。
  387. // - request: 执行数据库查询的 SQL 请求。
  388. func (e *Exporter) scrapeGenericValues(db *sql.DB, ch chan<- prometheus.Metric, context string, labels []string,
  389. metricsDesc map[string]string, metricsType map[string]string, metricsBuckets map[string]map[string]string, fieldToAppend string, ignoreZeroResult bool, request string) error {
  390. metricsCount := 0 // 记录抓取的指标数量
  391. // 通用解析器,用于处理查询结果行
  392. genericParser := func(row map[string]string) error {
  393. labelsValues := []string{} // 存储标签的值
  394. // 遍历标签数组,将对应标签的值添加到 labelsValues 中
  395. for _, label := range labels {
  396. labelsValues = append(labelsValues, row[strings.ToUpper(label)])
  397. }
  398. // 遍历所有指标,抓取其对应的值
  399. for metric, metricHelp := range metricsDesc {
  400. // 将指标值转换为浮动点数
  401. value, err := strconv.ParseFloat(strings.TrimSpace(row[strings.ToUpper(metric)]), 64)
  402. if err != nil {
  403. // 如果无法转换为浮动点数,跳过该指标
  404. level.Error(e.logger).Log("msg", "无法将当前值转换为浮动点数", "metric", metric, "metricHelp", metricHelp, "value", row[metric])
  405. continue
  406. }
  407. level.Debug(e.logger).Log("查询结果:", value)
  408. // 如果指标名称不使用附加字段(fieldToAppend),则使用普通的描述
  409. if strings.Compare(fieldToAppend, "") == 0 {
  410. desc := prometheus.NewDesc(
  411. prometheus.BuildFQName(namespace, context, metric), // 构建 Prometheus 描述符
  412. metricHelp, // 指标的帮助信息
  413. labels, // 指标的标签
  414. nil, // 没有附加标签
  415. )
  416. // 如果该指标是直方图类型
  417. if metricsType[strings.ToLower(metric)] == "histogram" {
  418. // 获取直方图的计数
  419. count, err := strconv.ParseUint(strings.TrimSpace(row["COUNT"]), 10, 64)
  420. if err != nil {
  421. // 如果无法转换计数值为整型,跳过该指标
  422. level.Error(e.logger).Log("无法将 count 值转换为整型 (metric=" + metric +
  423. ",metricHelp=" + metricHelp + ",value=<" + row["COUNT"] + ">)")
  424. continue
  425. }
  426. // 创建桶(buckets)并填充数据
  427. buckets := make(map[float64]uint64)
  428. for field, le := range metricsBuckets[strings.ToUpper(metric)] {
  429. lelimit, err := strconv.ParseFloat(strings.TrimSpace(le), 64)
  430. if err != nil {
  431. // 如果桶的限制值无法转换为浮动点数,跳过
  432. level.Error(e.logger).Log("无法将桶的限制值转换为浮动点数 (metric=" + metric +
  433. ",metricHelp=" + metricHelp + ",bucketlimit=<" + le + ">)")
  434. continue
  435. }
  436. fmt.Println("row[field]", row[field])
  437. counter, err := strconv.ParseUint(strings.TrimSpace(row[strings.ToUpper(field)]), 10, 64)
  438. if err != nil {
  439. // 如果桶的计数无法转换为整型,跳过
  440. level.Error(e.logger).Log("无法将桶计数值转换为整型 (metric=" + metric +
  441. ",metricHelp=" + metricHelp + ",value=<" + row[field] + ">)")
  442. continue
  443. }
  444. buckets[lelimit] = counter
  445. }
  446. // 将直方图指标发送到 Prometheus 通道
  447. ch <- prometheus.MustNewConstHistogram(desc, count, value, buckets, labelsValues...)
  448. } else {
  449. // 对于非直方图类型指标,直接将其发送到 Prometheus 通道
  450. ch <- prometheus.MustNewConstMetric(desc, getMetricType(metric, metricsType), value, labelsValues...)
  451. }
  452. } else {
  453. // 如果指标使用附加字段,则使用附加字段值构建描述符
  454. desc := prometheus.NewDesc(
  455. prometheus.BuildFQName(namespace, context, cleanName(row[fieldToAppend])),
  456. metricHelp,
  457. nil, nil, // 没有标签
  458. )
  459. // 处理直方图类型指标
  460. if metricsType[strings.ToLower(metric)] == "histogram" {
  461. // 获取直方图的计数
  462. count, err := strconv.ParseUint(strings.TrimSpace(row["count"]), 10, 64)
  463. if err != nil {
  464. // 如果无法转换计数值为整型,跳过该指标
  465. level.Error(e.logger).Log("无法将 count 值转换为整型 (metric=" + metric +
  466. ",metricHelp=" + metricHelp + ",value=<" + row["count"] + ">)")
  467. continue
  468. }
  469. // 创建桶(buckets)并填充数据
  470. buckets := make(map[float64]uint64)
  471. for field, le := range metricsBuckets[strings.ToUpper(metric)] {
  472. lelimit, err := strconv.ParseFloat(strings.TrimSpace(le), 64)
  473. if err != nil {
  474. // 如果桶的限制值无法转换为浮动点数,跳过
  475. level.Error(e.logger).Log("无法将桶的限制值转换为浮动点数 (metric=" + metric +
  476. ",metricHelp=" + metricHelp + ",bucketlimit=<" + le + ">)")
  477. continue
  478. }
  479. counter, err := strconv.ParseUint(strings.TrimSpace(row[field]), 10, 64)
  480. if err != nil {
  481. // 如果桶的计数无法转换为整型,跳过
  482. level.Error(e.logger).Log("无法将桶计数值转换为整型 (metric=" + metric +
  483. ",metricHelp=" + metricHelp + ",value=<" + row[field] + ">)")
  484. continue
  485. }
  486. buckets[lelimit] = counter
  487. }
  488. // 将直方图指标发送到 Prometheus 通道
  489. ch <- prometheus.MustNewConstHistogram(desc, count, value, buckets)
  490. } else {
  491. // 对于非直方图类型指标,直接将其发送到 Prometheus 通道
  492. ch <- prometheus.MustNewConstMetric(desc, getMetricType(metric, metricsType), value)
  493. }
  494. }
  495. metricsCount++ // 增加抓取的指标计数
  496. }
  497. return nil
  498. }
  499. level.Debug(e.logger).Log("调用函数 GeneratePrometheusMetrics()")
  500. // 调用函数生成 Prometheus 指标
  501. err := e.generatePrometheusMetrics(db, genericParser, request)
  502. level.Debug(e.logger).Log("ScrapeGenericValues() - metricsCount: ", metricsCount)
  503. if err != nil {
  504. return err
  505. }
  506. // 如果没有找到任何指标且未忽略零结果,则返回错误
  507. if !ignoreZeroResult && metricsCount == 0 {
  508. return errors.New("没有找到任何指标")
  509. }
  510. return err
  511. }
  512. // generatePrometheusMetrics 从数据库中执行 SQL 查询,解析查询结果并调用解析函数处理每一行数据。
  513. // 参数:
  514. // db: 需要查询的数据库连接。
  515. // parse: 一个处理每一行结果的函数,它接收一个映射(map)作为输入参数。
  516. // query: 要执行的 SQL 查询。
  517. // 返回:
  518. // 如果查询或处理过程中出现错误,返回错误。
  519. func (e *Exporter) generatePrometheusMetrics(db *sql.DB, parse func(row map[string]string) error, query string) error {
  520. // 设置查询超时,避免长时间阻塞
  521. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.config.QueryTimeout)*time.Second)
  522. defer cancel()
  523. // 执行 SQL 查询
  524. rows, err := db.QueryContext(ctx, query)
  525. // 检查是否超时
  526. if errors.Is(ctx.Err(), context.DeadlineExceeded) {
  527. return errors.New("xugu query timed out")
  528. }
  529. // 检查查询是否出错
  530. if err != nil {
  531. return err
  532. }
  533. // 获取查询结果的列名
  534. cols, err := rows.Columns()
  535. defer rows.Close()
  536. // 如果查询结果的列名获取失败,返回错误
  537. if err != nil {
  538. return err
  539. }
  540. // 遍历查询结果的每一行
  541. for rows.Next() {
  542. // 创建一个切片,用来表示每一列的值,以及一个切片,用来存储每列的指针
  543. columns := make([]interface{}, len(cols))
  544. columnPointers := make([]interface{}, len(cols))
  545. for i := range columns {
  546. columnPointers[i] = &columns[i]
  547. }
  548. // 将查询结果扫描到 columnPointers 切片中
  549. if err := rows.Scan(columnPointers...); err != nil {
  550. return err
  551. }
  552. // 创建一个 map,用来存储每列的值,列名作为 key
  553. m := make(map[string]string)
  554. for i, colName := range cols {
  555. // 获取列的值并将其格式化为字符串,存入 map 中
  556. val := columnPointers[i].(*interface{})
  557. //m[strings.ToLower(colName)] = fmt.Sprintf("%v", *val)
  558. m[strings.ToUpper(colName)] = fmt.Sprintf("%v", *val)
  559. }
  560. fmt.Println("m:", m)
  561. // 调用传入的解析函数处理每一行数据
  562. if err := parse(m); err != nil {
  563. return err
  564. }
  565. }
  566. // 查询和处理成功后返回 nil
  567. return nil
  568. }
  569. // getMetricType 函数根据传入的 metricType 和 metricsType 获取相应的 Prometheus 值类型 (prometheus.ValueType)
  570. // 参数:
  571. // metricType: 一个字符串,表示传入的指标类型
  572. // metricsType: 一个字典映射,包含了每个 metricType 对应的字符串类型
  573. // 返回:
  574. // 返回对应的 prometheus.ValueType
  575. func getMetricType(metricType string, metricsType map[string]string) prometheus.ValueType {
  576. // 定义一个映射,将字符串类型映射到 Prometheus 的 ValueType 类型
  577. var strToPromType = map[string]prometheus.ValueType{
  578. "gauge": prometheus.GaugeValue, // "gauge" 映射为 Prometheus 的 GaugeValue 类型
  579. "counter": prometheus.CounterValue, // "counter" 映射为 Prometheus 的 CounterValue 类型
  580. "histogram": prometheus.UntypedValue, // "histogram" 映射为 Prometheus 的 UntypedValue 类型
  581. }
  582. // 将传入的 metricType 转为小写并在 metricsType 字典中查找其对应的类型
  583. strType, ok := metricsType[strings.ToLower(metricType)]
  584. if !ok {
  585. // 如果在 metricsType 中没有找到对应的类型,则默认返回 prometheus.GaugeValue
  586. return prometheus.GaugeValue
  587. }
  588. // 查找对应的 Prometheus ValueType 类型
  589. valueType, ok := strToPromType[strings.ToLower(strType)]
  590. if !ok {
  591. // 如果在 strToPromType 中没有找到对应的 Prometheus 类型,则抛出 panic
  592. panic(errors.New("Error while getting prometheus type " + strings.ToLower(strType)))
  593. }
  594. // 返回对应的 Prometheus 类型
  595. return valueType
  596. }
  597. // checkIfMetricsChanged 方法检查配置文件中定义的自定义指标文件是否发生变化。
  598. // 如果文件发生了变化(通过计算哈希值进行比较),则返回 true,表示需要重新加载指标。
  599. // 返回:
  600. // 如果有文件发生变化,则返回 true,否则返回 false。
  601. func (e *Exporter) checkIfMetricsChanged() bool {
  602. // 遍历自定义指标文件的列表,分隔符为逗号
  603. for i, _customMetrics := range strings.Split(e.config.CustomMetrics, ",") {
  604. // 如果自定义指标文件名为空,跳过当前循环
  605. if len(_customMetrics) == 0 {
  606. continue
  607. }
  608. // 记录正在检查的文件
  609. level.Debug(e.logger).Log("checking modifications in following metrics definition file:", _customMetrics)
  610. // 创建一个新的 SHA256 哈希计算器
  611. h := sha256.New()
  612. // 使用 hashFile 函数计算文件的哈希值
  613. if err := hashFile(h, _customMetrics); err != nil {
  614. // 如果文件哈希计算失败,记录错误并返回 false
  615. level.Error(e.logger).Log("unable to get file hash", err.Error())
  616. return false
  617. }
  618. // 如果当前文件的哈希值与之前存储的哈希值不同,表示文件已更改
  619. if !bytes.Equal(hashMap[i], h.Sum(nil)) {
  620. // 记录文件已更改,触发重新加载指标
  621. level.Info(e.logger).Log(_customMetrics, "has been changed. Reloading metrics...")
  622. // 更新存储的哈希值
  623. hashMap[i] = h.Sum(nil)
  624. return true
  625. }
  626. }
  627. // 如果所有文件都没有变化,返回 false
  628. return false
  629. }
  630. // reloadMetrics 方法用于重新加载 Prometheus 指标。
  631. // 它首先清空当前的指标列表,加载默认的指标,然后根据配置加载自定义的指标。
  632. // 如果有自定义指标文件,则加载并解析这些文件,最终将所有指标添加到 e.metricsToScrape 中。
  633. // 返回值:
  634. // 无,直接操作实例的状态(e.metricsToScrape)。
  635. func (e *Exporter) reloadMetrics() {
  636. // 清空当前的指标列表
  637. e.metricsToScrape.Metric = []Metric{}
  638. // 加载默认指标
  639. defaultMetrics := e.DefaultMetrics()
  640. e.metricsToScrape.Metric = defaultMetrics.Metric
  641. // 如果有自定义指标配置,则加载它们
  642. if strings.Compare(e.config.CustomMetrics, "") != 0 {
  643. // 遍历自定义指标配置文件列表(由逗号分隔)
  644. for _, _customMetrics := range strings.Split(e.config.CustomMetrics, ",") {
  645. // 判断是 TOML 文件还是 YAML 文件,根据文件后缀调用不同的加载函数
  646. if strings.HasSuffix(_customMetrics, "toml") {
  647. // 加载 TOML 配置文件
  648. if err := loadTomlMetricsConfig(_customMetrics, &additionalMetrics); err != nil {
  649. // 如果加载失败,触发 panic
  650. panic(err)
  651. }
  652. }
  653. // 记录成功加载自定义指标
  654. level.Info(e.logger).Log("event", "Successfully loaded custom metrics from "+_customMetrics)
  655. // 输出调试信息,显示解析后的自定义指标内容
  656. level.Debug(e.logger).Log("custom metrics parsed content", fmt.Sprintf("%+v", additionalMetrics))
  657. // 将加载的自定义指标添加到 e.metricsToScrape.Metric 中
  658. e.metricsToScrape.Metric = append(e.metricsToScrape.Metric, additionalMetrics.Metric...)
  659. }
  660. } else {
  661. // 如果没有定义自定义指标,记录调试日志
  662. level.Debug(e.logger).Log("No custom metrics defined.")
  663. }
  664. }
  665. // loadTomlMetricsConfig 函数用于加载并解析 TOML 配置文件。
  666. // 参数:
  667. // _customMetrics: 自定义指标文件的路径。
  668. // metrics: 用于存储解析后的自定义指标的结构体。
  669. // 返回:
  670. // 如果解析成功,返回 nil;如果出现错误,返回相应的错误。
  671. func loadTomlMetricsConfig(_customMetrics string, metrics *Metrics) error {
  672. // 使用 toml.DecodeFile 解析 TOML 配置文件
  673. if _, err := toml.DecodeFile(_customMetrics, metrics); err != nil {
  674. // 如果解析失败,返回格式化的错误消息
  675. return fmt.Errorf("cannot read the metrics config %s: %w", _customMetrics, err)
  676. }
  677. // 解析成功,返回 nil
  678. return nil
  679. }