123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776 |
- package collector
- import (
- "bytes"
- "context"
- "crypto/sha256"
- "database/sql"
- "errors"
- "fmt"
- "net/url"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/BurntSushi/toml"
- "github.com/go-kit/log"
- "github.com/go-kit/log/level"
- "github.com/prometheus/client_golang/prometheus"
- )
- type Exporter struct {
- config *Config
- mu *sync.Mutex
- metricsToScrape Metrics
- scrapeInterval *time.Duration
- dsn string
- duration, error prometheus.Gauge
- totalScrapes prometheus.Counter
- scrapeErrors *prometheus.CounterVec
- scrapeResults []prometheus.Metric
- up prometheus.Gauge
- db *sql.DB
- logger log.Logger
- }
- type Config struct {
- DSN string
- MaxIdleConns int
- MaxOpenConns int
- CustomMetrics string
- QueryTimeout int
- DefaultMetricsFile string
- }
- func CreateDefaultConfig() *Config {
- return &Config{
- MaxIdleConns: 0,
- MaxOpenConns: 10,
- CustomMetrics: "",
- QueryTimeout: 5,
- DefaultMetricsFile: "",
- }
- }
- type Metric struct {
- Context string
- Labels []string
- MetricsDesc map[string]string
- MetricsType map[string]string
- MetricsBuckets map[string]map[string]string
- FieldToAppend string
- Request string
- IgnoreZeroResult bool
- }
- type Metrics struct {
- Metric []Metric `json:"metrics"`
- }
- var (
- additionalMetrics Metrics
- hashMap = make(map[int][]byte)
- namespace = "xugudb"
- exporterName = "exporter"
- )
- func maskDsn(dsn string) string {
- parts := strings.Split(dsn, "@")
- if len(parts) > 1 {
- maskedURL := "***@" + parts[1]
- return maskedURL
- }
- return dsn
- }
- func NewExporter(logger log.Logger, cfg *Config) (*Exporter, error) {
- e := &Exporter{
- mu: &sync.Mutex{},
- dsn: cfg.DSN,
-
- duration: prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: namespace,
- Subsystem: exporterName,
- Name: "last_scrape_duration_seconds",
- Help: "上次从 xugu 数据库抓取指标的持续时间(秒)。",
- }),
-
- totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: namespace,
- Subsystem: exporterName,
- Name: "scrapes_total",
- Help: "从 xugu 数据库抓取指标的总次数。",
- }),
-
- scrapeErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: namespace,
- Subsystem: exporterName,
- Name: "scrape_errors_total",
- Help: "从 xugu 数据库抓取指标时发生错误的总次数。",
- }, []string{"collector"}),
-
- error: prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: namespace,
- Subsystem: exporterName,
- Name: "last_scrape_error",
- Help: "上次从 xugu 数据库抓取指标是否出错(1 表示出错,0 表示成功)。",
- }),
-
- up: prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: namespace,
- Name: "up",
- Help: "xugu 数据库服务器是否可用。",
- }),
- logger: logger,
- config: cfg,
- }
-
- e.metricsToScrape = e.DefaultMetrics()
-
- err := e.connect()
- return e, err
- }
- func (e *Exporter) connect() error {
-
- _, err := url.Parse(e.dsn)
- if err != nil {
-
- level.Error(e.logger).Log("malformed DSN: ", maskDsn(e.dsn))
- return err
- }
-
- level.Debug(e.logger).Log("launching connection: ", maskDsn(e.dsn))
-
- fmt.Println("连接的虚谷数据库为: ", e.dsn)
- db, err := sql.Open("xugu", e.dsn)
- if err != nil {
-
- level.Error(e.logger).Log("error while connecting to", e.dsn)
- return err
- }
-
- level.Debug(e.logger).Log("set max idle connections to ", e.config.MaxIdleConns)
- db.SetMaxIdleConns(e.config.MaxIdleConns)
-
- level.Debug(e.logger).Log("set max open connections to ", e.config.MaxOpenConns)
- db.SetMaxOpenConns(e.config.MaxOpenConns)
-
- level.Debug(e.logger).Log("successfully connected to: ", maskDsn(e.dsn))
- e.db = db
-
- return nil
- }
- func (e *Exporter) RunScheduledScrapes(ctx context.Context, si time.Duration) {
-
- e.scrapeInterval = &si
-
- ticker := time.NewTicker(si)
-
- defer ticker.Stop()
-
- for {
- select {
-
- case <-ticker.C:
- e.mu.Lock()
- e.scheduledScrape()
- e.mu.Unlock()
-
- case <-ctx.Done():
- return
- }
- }
- }
- func (e *Exporter) scheduledScrape() {
-
- metricCh := make(chan prometheus.Metric, 5)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- go func() {
-
- defer wg.Done()
-
- e.scrapeResults = []prometheus.Metric{}
-
- for {
- scrapeResult, more := <-metricCh
- if more {
-
- e.scrapeResults = append(e.scrapeResults, scrapeResult)
- continue
- }
-
- return
- }
- }()
-
- e.scrape(metricCh)
-
- metricCh <- e.duration
- metricCh <- e.totalScrapes
- metricCh <- e.error
- e.scrapeErrors.Collect(metricCh)
-
- metricCh <- e.up
-
- close(metricCh)
-
- wg.Wait()
- }
- func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
-
-
-
-
-
-
-
-
- metricCh := make(chan prometheus.Metric)
- doneCh := make(chan struct{})
-
- go func() {
- for m := range metricCh {
- ch <- m.Desc()
- }
- close(doneCh)
- }()
- e.Collect(metricCh)
- close(metricCh)
- <-doneCh
- }
- func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
-
- if e.scrapeInterval != nil && *e.scrapeInterval != 0 {
-
- e.mu.Lock()
- for _, r := range e.scrapeResults {
- ch <- r
- }
- e.mu.Unlock()
- return
- }
-
- e.mu.Lock()
- defer e.mu.Unlock()
-
- e.scrape(ch)
-
- ch <- e.duration
- ch <- e.totalScrapes
- ch <- e.error
- e.scrapeErrors.Collect(ch)
- ch <- e.up
- }
- func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
- e.totalScrapes.Inc()
- var err error
- var errmutex sync.Mutex
-
- defer func(begun time.Time) {
- e.duration.Set(time.Since(begun).Seconds())
- if err == nil {
- e.error.Set(0)
- } else {
- e.error.Set(1)
- }
- }(time.Now())
-
- if err = e.db.Ping(); err != nil {
- if strings.Contains(err.Error(), "sql: database is closed") {
- level.Info(e.logger).Log("Reconnecting to DB")
- err = e.connect()
- if err != nil {
- level.Error(e.logger).Log("error reconnecting to DB", err.Error())
- }
- }
- }
-
- if err = e.db.Ping(); err != nil {
- level.Error(e.logger).Log("error pinging xugu:", err.Error())
- e.up.Set(0)
- return
- }
- level.Debug(e.logger).Log("Successfully pinged xugu database: ", maskDsn(e.dsn))
- e.up.Set(1)
-
- if e.checkIfMetricsChanged() {
- e.reloadMetrics()
- }
- wg := sync.WaitGroup{}
-
- for _, metric := range e.metricsToScrape.Metric {
- wg.Add(1)
- metric := metric
- f := func() {
- defer wg.Done()
-
- level.Debug(e.logger).Log("About to scrape metric: ")
- level.Debug(e.logger).Log("- Metric MetricsDesc: ", fmt.Sprintf("%+v", metric.MetricsDesc))
- level.Debug(e.logger).Log("- Metric Context: ", metric.Context)
- level.Debug(e.logger).Log("- Metric MetricsType: ", fmt.Sprintf("%+v", metric.MetricsType))
- level.Debug(e.logger).Log("- Metric MetricsBuckets: ", fmt.Sprintf("%+v", metric.MetricsBuckets), "(Ignored unless Histogram type)")
- level.Debug(e.logger).Log("- Metric Labels: ", fmt.Sprintf("%+v", metric.Labels))
- level.Debug(e.logger).Log("- Metric FieldToAppend: ", metric.FieldToAppend)
- level.Debug(e.logger).Log("- Metric IgnoreZeroResult: ", fmt.Sprintf("%+v", metric.IgnoreZeroResult))
- level.Debug(e.logger).Log("- Metric Request: ", metric.Request)
-
- if len(metric.Request) == 0 {
- level.Error(e.logger).Log("Error scraping for ", metric.MetricsDesc, ". Did you forget to define request in your metrics config file?")
- return
- }
-
- if len(metric.MetricsDesc) == 0 {
- level.Error(e.logger).Log("Error scraping for query", metric.Request, ". Did you forget to define metricsdesc in your metrics config file?")
- return
- }
-
- for column, metricType := range metric.MetricsType {
- if metricType == "histogram" {
- _, ok := metric.MetricsBuckets[column]
- if !ok {
- level.Error(e.logger).Log("Unable to find MetricsBuckets configuration key for metric. (metric=" + column + ")")
- return
- }
- }
- }
- scrapeStart := time.Now()
-
- if err1 := e.ScrapeMetric(e.db, ch, metric); err1 != nil {
-
- errmutex.Lock()
- {
- err = err1
- }
- errmutex.Unlock()
- level.Error(e.logger).Log("scrapeMetricContext", metric.Context, "ScrapeDuration", time.Since(scrapeStart), "msg", err1.Error())
- e.scrapeErrors.WithLabelValues(metric.Context).Inc()
- } else {
-
- level.Debug(e.logger).Log("successfully scraped metric: ", metric.Context, metric.MetricsDesc, time.Since(scrapeStart))
- }
- }
- go f()
- }
- wg.Wait()
- }
- func (e *Exporter) ScrapeMetric(db *sql.DB, ch chan<- prometheus.Metric, metricDefinition Metric) error {
- level.Debug(e.logger).Log("调用函数 ScrapeGenericValues()")
-
- return e.scrapeGenericValues(db, ch, metricDefinition.Context, metricDefinition.Labels,
- metricDefinition.MetricsDesc, metricDefinition.MetricsType, metricDefinition.MetricsBuckets,
- metricDefinition.FieldToAppend, metricDefinition.IgnoreZeroResult,
- metricDefinition.Request)
- }
- func (e *Exporter) scrapeGenericValues(db *sql.DB, ch chan<- prometheus.Metric, context string, labels []string,
- metricsDesc map[string]string, metricsType map[string]string, metricsBuckets map[string]map[string]string, fieldToAppend string, ignoreZeroResult bool, request string) error {
- metricsCount := 0
-
- genericParser := func(row map[string]string) error {
- labelsValues := []string{}
-
- for _, label := range labels {
- labelsValues = append(labelsValues, row[strings.ToUpper(label)])
- }
-
- for metric, metricHelp := range metricsDesc {
-
- value, err := strconv.ParseFloat(strings.TrimSpace(row[strings.ToUpper(metric)]), 64)
- if err != nil {
-
- level.Error(e.logger).Log("msg", "无法将当前值转换为浮动点数", "metric", metric, "metricHelp", metricHelp, "value", row[metric])
- continue
- }
- level.Debug(e.logger).Log("查询结果:", value)
-
- if strings.Compare(fieldToAppend, "") == 0 {
- desc := prometheus.NewDesc(
- prometheus.BuildFQName(namespace, context, metric),
- metricHelp,
- labels,
- nil,
- )
-
- if metricsType[strings.ToLower(metric)] == "histogram" {
-
- count, err := strconv.ParseUint(strings.TrimSpace(row["COUNT"]), 10, 64)
- if err != nil {
-
- level.Error(e.logger).Log("无法将 count 值转换为整型 (metric=" + metric +
- ",metricHelp=" + metricHelp + ",value=<" + row["COUNT"] + ">)")
- continue
- }
-
- buckets := make(map[float64]uint64)
- for field, le := range metricsBuckets[strings.ToUpper(metric)] {
- lelimit, err := strconv.ParseFloat(strings.TrimSpace(le), 64)
- if err != nil {
-
- level.Error(e.logger).Log("无法将桶的限制值转换为浮动点数 (metric=" + metric +
- ",metricHelp=" + metricHelp + ",bucketlimit=<" + le + ">)")
- continue
- }
- fmt.Println("row[field]", row[field])
- counter, err := strconv.ParseUint(strings.TrimSpace(row[strings.ToUpper(field)]), 10, 64)
- if err != nil {
-
- level.Error(e.logger).Log("无法将桶计数值转换为整型 (metric=" + metric +
- ",metricHelp=" + metricHelp + ",value=<" + row[field] + ">)")
- continue
- }
- buckets[lelimit] = counter
- }
-
- ch <- prometheus.MustNewConstHistogram(desc, count, value, buckets, labelsValues...)
- } else {
-
- ch <- prometheus.MustNewConstMetric(desc, getMetricType(metric, metricsType), value, labelsValues...)
- }
- } else {
-
- desc := prometheus.NewDesc(
- prometheus.BuildFQName(namespace, context, cleanName(row[fieldToAppend])),
- metricHelp,
- nil, nil,
- )
-
- if metricsType[strings.ToLower(metric)] == "histogram" {
-
- count, err := strconv.ParseUint(strings.TrimSpace(row["count"]), 10, 64)
- if err != nil {
-
- level.Error(e.logger).Log("无法将 count 值转换为整型 (metric=" + metric +
- ",metricHelp=" + metricHelp + ",value=<" + row["count"] + ">)")
- continue
- }
-
- buckets := make(map[float64]uint64)
- for field, le := range metricsBuckets[strings.ToUpper(metric)] {
- lelimit, err := strconv.ParseFloat(strings.TrimSpace(le), 64)
- if err != nil {
-
- level.Error(e.logger).Log("无法将桶的限制值转换为浮动点数 (metric=" + metric +
- ",metricHelp=" + metricHelp + ",bucketlimit=<" + le + ">)")
- continue
- }
- counter, err := strconv.ParseUint(strings.TrimSpace(row[field]), 10, 64)
- if err != nil {
-
- level.Error(e.logger).Log("无法将桶计数值转换为整型 (metric=" + metric +
- ",metricHelp=" + metricHelp + ",value=<" + row[field] + ">)")
- continue
- }
- buckets[lelimit] = counter
- }
-
- ch <- prometheus.MustNewConstHistogram(desc, count, value, buckets)
- } else {
-
- ch <- prometheus.MustNewConstMetric(desc, getMetricType(metric, metricsType), value)
- }
- }
- metricsCount++
- }
- return nil
- }
- level.Debug(e.logger).Log("调用函数 GeneratePrometheusMetrics()")
-
- err := e.generatePrometheusMetrics(db, genericParser, request)
- level.Debug(e.logger).Log("ScrapeGenericValues() - metricsCount: ", metricsCount)
- if err != nil {
- return err
- }
-
- if !ignoreZeroResult && metricsCount == 0 {
- return errors.New("没有找到任何指标")
- }
- return err
- }
- func (e *Exporter) generatePrometheusMetrics(db *sql.DB, parse func(row map[string]string) error, query string) error {
-
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.config.QueryTimeout)*time.Second)
- defer cancel()
-
- rows, err := db.QueryContext(ctx, query)
-
- if errors.Is(ctx.Err(), context.DeadlineExceeded) {
- return errors.New("xugu query timed out")
- }
-
- if err != nil {
- return err
- }
-
- cols, err := rows.Columns()
- defer rows.Close()
-
- if err != nil {
- return err
- }
-
- for rows.Next() {
-
- columns := make([]interface{}, len(cols))
- columnPointers := make([]interface{}, len(cols))
- for i := range columns {
- columnPointers[i] = &columns[i]
- }
-
- if err := rows.Scan(columnPointers...); err != nil {
- return err
- }
-
- m := make(map[string]string)
- for i, colName := range cols {
-
- val := columnPointers[i].(*interface{})
-
- m[strings.ToUpper(colName)] = fmt.Sprintf("%v", *val)
- }
- fmt.Println("m:", m)
-
- if err := parse(m); err != nil {
- return err
- }
- }
-
- return nil
- }
- func getMetricType(metricType string, metricsType map[string]string) prometheus.ValueType {
-
- var strToPromType = map[string]prometheus.ValueType{
- "gauge": prometheus.GaugeValue,
- "counter": prometheus.CounterValue,
- "histogram": prometheus.UntypedValue,
- }
-
- strType, ok := metricsType[strings.ToLower(metricType)]
- if !ok {
-
- return prometheus.GaugeValue
- }
-
- valueType, ok := strToPromType[strings.ToLower(strType)]
- if !ok {
-
- panic(errors.New("Error while getting prometheus type " + strings.ToLower(strType)))
- }
-
- return valueType
- }
- func (e *Exporter) checkIfMetricsChanged() bool {
-
- for i, _customMetrics := range strings.Split(e.config.CustomMetrics, ",") {
-
- if len(_customMetrics) == 0 {
- continue
- }
-
- level.Debug(e.logger).Log("checking modifications in following metrics definition file:", _customMetrics)
-
- h := sha256.New()
-
- if err := hashFile(h, _customMetrics); err != nil {
-
- level.Error(e.logger).Log("unable to get file hash", err.Error())
- return false
- }
-
- if !bytes.Equal(hashMap[i], h.Sum(nil)) {
-
- level.Info(e.logger).Log(_customMetrics, "has been changed. Reloading metrics...")
-
- hashMap[i] = h.Sum(nil)
- return true
- }
- }
-
- return false
- }
- func (e *Exporter) reloadMetrics() {
-
- e.metricsToScrape.Metric = []Metric{}
-
- defaultMetrics := e.DefaultMetrics()
- e.metricsToScrape.Metric = defaultMetrics.Metric
-
- if strings.Compare(e.config.CustomMetrics, "") != 0 {
-
- for _, _customMetrics := range strings.Split(e.config.CustomMetrics, ",") {
-
- if strings.HasSuffix(_customMetrics, "toml") {
-
- if err := loadTomlMetricsConfig(_customMetrics, &additionalMetrics); err != nil {
-
- panic(err)
- }
- }
-
- level.Info(e.logger).Log("event", "Successfully loaded custom metrics from "+_customMetrics)
-
- level.Debug(e.logger).Log("custom metrics parsed content", fmt.Sprintf("%+v", additionalMetrics))
-
- e.metricsToScrape.Metric = append(e.metricsToScrape.Metric, additionalMetrics.Metric...)
- }
- } else {
-
- level.Debug(e.logger).Log("No custom metrics defined.")
- }
- }
- func loadTomlMetricsConfig(_customMetrics string, metrics *Metrics) error {
-
- if _, err := toml.DecodeFile(_customMetrics, metrics); err != nil {
-
- return fmt.Errorf("cannot read the metrics config %s: %w", _customMetrics, err)
- }
-
- return nil
- }
|