Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ require (
github.com/swaggo/swag v1.6.7
github.com/ungerik/go-dry v0.0.0-20210209114055-a3e162a9e62e
github.com/urfave/cli/v2 v2.8.1
github.com/vektah/gqlparser/v2 v2.5.1
github.com/lib/pq v1.10.2
golang.org/x/net v0.15.0
google.golang.org/grpc v1.50.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
55 changes: 38 additions & 17 deletions sqle/server/auditplan/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@ type Meta struct {
}

const (
TypeDefault = "default"
TypeMySQLSlowLog = "mysql_slow_log"
TypeMySQLMybatis = "mysql_mybatis"
TypeMySQLSchemaMeta = "mysql_schema_meta"
TypeMySQLProcesslist = "mysql_processlist"
TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log"
TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log"
TypeHuaweiRdsMySQLSlowLog = "huawei_rds_mysql_slow_log"
TypeOracleTopSQL = "oracle_top_sql"
TypeTiDBAuditLog = "tidb_audit_log"
TypeAllAppExtract = "all_app_extract"
TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log"
TypeSQLFile = "sql_file"
TypeDefault = "default"
TypeMySQLSlowLog = "mysql_slow_log"
TypeMySQLMybatis = "mysql_mybatis"
TypeMySQLSchemaMeta = "mysql_schema_meta"
TypeMySQLProcesslist = "mysql_processlist"
TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log"
TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log"
TypeOracleTopSQL = "oracle_top_sql"
TypeTiDBAuditLog = "tidb_audit_log"
TypeAllAppExtract = "all_app_extract"
TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log"
TypeSQLFile = "sql_file"
TypePostgreSQLSchemaMeta = "Postgresql_schema_meta"
)

const (
InstanceTypeAll = ""
InstanceTypeMySQL = "MySQL"
InstanceTypeOracle = "Oracle"
InstanceTypeTiDB = "TiDB"
InstanceTypeAll = ""
InstanceTypeMySQL = "MySQL"
InstanceTypeOracle = "Oracle"
InstanceTypeTiDB = "TiDB"
InstanceTypePostgreSQL = "PostgreSQL"
)

const (
Expand Down Expand Up @@ -371,6 +372,26 @@ var Metas = []Meta{
InstanceType: InstanceTypeAll,
CreateTask: NewDefaultTask,
},
{
Type: TypePostgreSQLSchemaMeta,
Desc: "库表元数据",
InstanceType: InstanceTypePostgreSQL,
CreateTask: NewPostgreSQLSchemaMetaTask,
Params: []*params.Param{
{
Key: paramKeyCollectIntervalMinute,
Desc: "采集周期(分钟)",
Value: "60",
Type: params.ParamTypeInt,
},
{
Key: "collect_view",
Desc: "是否采集视图信息",
Value: "0",
Type: params.ParamTypeBool,
},
},
},
}

var MetaMap = map[string]Meta{}
Expand Down
267 changes: 267 additions & 0 deletions sqle/server/auditplan/task.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/actiontech/sqle/sqle/driver"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -1550,3 +1551,269 @@ func NewBaiduRdsMySQLSlowLogTask(entry *logrus.Entry, ap *model.AuditPlan) Task

return b
}

// PostgreSQL库表元数据
type PostgreSQLSchemaMetaTask struct {
*sqlCollector
}

func NewPostgreSQLSchemaMetaTask(entry *logrus.Entry, ap *model.AuditPlan) Task {
sqlCollector := newSQLCollector(entry, ap)
task := &PostgreSQLSchemaMetaTask{
sqlCollector,
}
sqlCollector.do = task.collectorDo
return task
}

func (at *PostgreSQLSchemaMetaTask) collectorDo() {
if at.ap.InstanceName == "" {
at.logger.Warnf("instance is not configured")
return
}
if at.ap.InstanceDatabase == "" {
at.logger.Warnf("instance schema is not configured")
return
}
instance, _, err := dms.GetInstanceInProjectByName(context.Background(), string(at.ap.ProjectId), at.ap.InstanceName)
if err != nil {
at.logger.Errorf("get pg instance in project by instanceName failed: %s\n", err)
return
}

pluginMgr := driver.GetPluginManager()
if !pluginMgr.IsOptionalModuleEnabled(instance.DbType, driverV2.OptionalModuleQuery) {
at.logger.Errorf("collect pg schema meta failed: %v",
driver.NewErrPluginAPINotImplement(driverV2.OptionalModuleQuery))
return
}
plugin, err := pluginMgr.OpenPlugin(at.logger, instance.DbType, &driverV2.Config{DSN: &driverV2.DSN{
Host: instance.Host,
Port: instance.Port,
User: instance.User,
Password: instance.Password,
AdditionalParams: instance.AdditionalParams,
DatabaseName: at.ap.InstanceDatabase,
}})
if err != nil {
at.logger.Errorf("connect to instance fail, error: %v", err)
return
}

schemas, err := at.GetAllUserSchemas(plugin)
if err != nil {
at.logger.Errorf("get database=%s schemas error: %s", at.ap.InstanceDatabase, err)
return
}
if len(schemas) == 0 {
at.logger.Errorf("get database=%s schemas empty error: %s", at.ap.InstanceDatabase, err)
return
}

finalTableSqls := make([]string, 0)
finalViewSqls := make([]string, 0)
for _, schema := range schemas {
tables, err := at.ShowSchemaTablesForPg(plugin, schema)
if err != nil {
at.logger.Errorf("get schema table fail, error: %s", err)
continue
}
for _, table := range tables {
tableSqls, err := at.ShowCreateTablesForPg(plugin, at.ap.InstanceDatabase, schema, table)
if err != nil {
at.logger.Errorf("show create table fail, error: %s", err)
continue
}
if len(tableSqls) > 0 {
finalTableSqls = append(finalTableSqls, tableSqls...)
}
}
if len(finalTableSqls) > 0 {
err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalTableSqls, schema))
if err != nil {
at.logger.Errorf("save table schema meta to storage fail, error: %s", err)
}
}
}

for _, schema := range schemas {
var views []string
if at.ap.Params.GetParam("collect_view").Bool() {
views, err = at.ShowSchemaViewsForPg(plugin, schema)
if err != nil {
at.logger.Errorf("get schema view fail, error: %s", err)
continue
}
}
for _, view := range views {
viewSqls, err := at.ShowCreateViewsForPg(plugin, at.ap.InstanceDatabase, schema, view)
if err != nil {
at.logger.Errorf("show create view fail, error: %s", err)
continue
}
if len(viewSqls) > 0 {
finalViewSqls = append(finalViewSqls, viewSqls...)
}
}
if len(finalViewSqls) > 0 {
err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalViewSqls, schema))
if err != nil {
at.logger.Errorf("save view schema meta to storage fail, error: %s", err)
}
}
}
}

func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin) ([]string, error) {
querySql := "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'"
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, schema string) ([]string, error) {
querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+
" where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema)
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, schema string) ([]string, error) {
querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+
" where table_schema='%s' and TABLE_TYPE='VIEW'", schema)
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) {
tables := make([]string, 0)
tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName)
columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'",
database, schema, tableName)
// 获取列定义,多个英文逗号分割
columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+
"CASE "+
" WHEN lower(data_type) IN ('char', 'varchar', 'character', 'character varying') "+
" THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+
" WHEN lower(data_type) IN ('numeric', 'decimal') "+
" THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+
" WHEN lower(data_type) IN ('integer', 'smallint', 'bigint', 'text') THEN data_type "+
" ELSE data_type "+
" END "+
" || "+
" CASE "+
" WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+
" || "+
" CASE "+
" WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+
" FROM information_schema.columns "+
" WHERE %s GROUP BY table_name", columnsCondition)
sqls, err := at.GetResultSqls(plugin, columns)
if err != nil {
at.logger.Errorf("search column definition error:%s\n", err)
return nil, err
}
if len(sqls) == 0 {
return tables, nil
}
tableDDl += strings.Join(sqls, "")
constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName)
// 获取所有约束
constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+
" pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+
" FROM pg_catalog.pg_constraint r "+
" JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+
" JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+
" WHERE %s", constraintsCondition)
sqls, err = at.GetResultSqls(plugin, constraints)
if err != nil {
at.logger.Errorf("search constraint definition error:%s\n", err)
return nil, err
}
for _, sqlContext := range sqls {
tableDDl += ",\n" + sqlContext
}
tableDDl += ")"
indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName)
// 获取索引
indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+
" WHERE %s", indexesCondition)
sqls, err = at.GetResultSqls(plugin, indexes)
if err != nil {
at.logger.Errorf("search index definition error:%s\n", err)
return nil, err
}
for _, sqlContent := range sqls {
if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") {
continue
}
tableDDl += ";\n" + sqlContent
}
tables = append(tables, tableDDl)
return tables, nil
}

func (at *PostgreSQLSchemaMetaTask) ShowCreateViewsForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) {
querySql := fmt.Sprintf(
"SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+
" AS create_view_statement "+
" FROM information_schema.views "+
" WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'",
database, schema, tableName)
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) GetResultSqls(plugin driver.Plugin, sql string) ([]string, error) {
var ret []string
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()

result, err := plugin.Query(ctx, sql, &driverV2.QueryConf{TimeOutSecond: 120})
if err != nil {
at.logger.Errorf("plugin.Query() failed:%s\n", err)
return nil, err
}
rows := result.Rows
for _, row := range rows {
values := row.Values
if len(values) == 0 {
continue
}
sqlContent := values[0].Value
if len(sqlContent) == 0 {
continue
}
ret = append(ret, sqlContent)
}
return ret, nil
}

func (at *PostgreSQLSchemaMetaTask) Audit() (*AuditResultResp, error) {
task, err := getTaskWithInstanceByAuditPlan(at.ap, at.persist)
if err != nil {
return nil, err
}
return at.baseTask.audit(task)
}

func (at *PostgreSQLSchemaMetaTask) GetSQLs(args map[string]interface{}) ([]Head, []map[string] /* head name */ string, uint64, error) {
auditPlanSQLs, count, err := at.persist.GetAuditPlanSQLsByReq(args)
if err != nil {
return nil, nil, count, err
}
head, rows := buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs)
return head, rows, count, nil
}

func buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs []*model.AuditPlanSQLListDetail) ([]Head, []map[string] /* head name */ string) {
head := []Head{
{
Name: "sql",
Desc: "SQL语句",
Type: "sql",
},
}
rows := make([]map[string]string, 0, len(auditPlanSQLs))
for _, sql := range auditPlanSQLs {
rows = append(rows, map[string]string{
"sql": sql.SQLContent,
})
}
return head, rows
}