diff --git a/plugins/connectors/jira/client.go b/plugins/connectors/jira/client.go new file mode 100644 index 00000000..69d25949 --- /dev/null +++ b/plugins/connectors/jira/client.go @@ -0,0 +1,263 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package jira + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + log "github.com/cihub/seelog" + "infini.sh/framework/core/util" +) + +const ( + APIPathSearch = "/rest/api/2/search" + APIPathComments = "/rest/api/2/issue/%s/comment" + DefaultTimeout = 30 * time.Second +) + +type JiraClient struct { + baseURL string + httpClient *http.Client + config *Config + retryCount int +} + +func NewJiraClient(config *Config) (*JiraClient, error) { + if config.BaseURL == "" { + return nil, fmt.Errorf("base URL is required") + } + + // 确保 URL 格式正确 + baseURL := strings.TrimSuffix(config.BaseURL, "/") + + client := &JiraClient{ + baseURL: baseURL, + config: config, + httpClient: &http.Client{ + Timeout: DefaultTimeout, + }, + retryCount: MaxRetries, + } + + return client, nil +} + +func (c *JiraClient) SearchIssues(ctx context.Context, jql string, startAt, maxResults int) (*SearchResult, error) { + params := url.Values{} + params.Set("jql", jql) + params.Set("startAt", fmt.Sprintf("%d", startAt)) + params.Set("maxResults", fmt.Sprintf("%d", maxResults)) + params.Set("expand", "names,schema") + + // 添加字段过滤 + if len(c.config.Fields) > 0 { + fields := strings.Join(c.config.Fields, ",") + params.Set("fields", fields) + } + + apiURL := fmt.Sprintf("%s%s?%s", c.baseURL, APIPathSearch, params.Encode()) + + var result SearchResult + err := c.makeRequestWithRetry(ctx, "GET", apiURL, nil, &result) + if err != nil { + return nil, fmt.Errorf("failed to search issues: %w", err) + } + + return &result, nil +} + +func (c *JiraClient) GetIssueComments(ctx context.Context, issueKey string) (*CommentsResponse, error) { + apiURL := fmt.Sprintf("%s"+APIPathComments, c.baseURL, issueKey) + + var result CommentsResponse + err := c.makeRequestWithRetry(ctx, "GET", apiURL, nil, &result) + if err != nil { + return nil, fmt.Errorf("failed to get comments for issue %s: %w", issueKey, err) + } + + // 为每个评论设置关联的问题键 + for i := range result.Comments { + result.Comments[i].IssueKey = issueKey + } + + return &result, nil +} + +func (c *JiraClient) makeRequestWithRetry(ctx context.Context, method, url string, body io.Reader, result interface{}) error { + var lastErr error + + for attempt := 0; attempt <= c.retryCount; attempt++ { + if attempt > 0 { + // 指数退避 + backoff := time.Duration(1< 30*time.Second { + backoff = 30 * time.Second + } + + log.Debugf("[jira client] retrying request after %v (attempt %d/%d)", backoff, attempt, c.retryCount) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + } + } + + err := c.makeRequest(ctx, method, url, body, result) + if err == nil { + return nil + } + + lastErr = err + + // 检查是否应该重试 + if !c.shouldRetry(err) { + break + } + + log.Warnf("[jira client] request failed, will retry: %v", err) + } + + return fmt.Errorf("request failed after %d attempts: %w", c.retryCount+1, lastErr) +} + +func (c *JiraClient) makeRequest(ctx context.Context, method, url string, body io.Reader, result interface{}) error { + req, err := http.NewRequestWithContext(ctx, method, url, body) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + // 设置认证 + c.setAuth(req) + + // 设置请求头 + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "Coco-Server-Jira-Connector/1.0") + + log.Debugf("[jira client] making request: %s %s", method, url) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + log.Debugf("[jira client] response status: %d", resp.StatusCode) + + // 读取响应体 + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + // 检查 HTTP 状态码 + if err := c.checkHTTPStatus(resp.StatusCode, respBody); err != nil { + return err + } + + // 解析 JSON 响应 + if result != nil { + if err := json.Unmarshal(respBody, result); err != nil { + return fmt.Errorf("failed to parse JSON response: %w", err) + } + } + + return nil +} + +func (c *JiraClient) setAuth(req *http.Request) { + switch strings.ToLower(c.config.AuthType) { + case "api_token": + if c.config.Username != "" && c.config.APIToken != "" { + req.SetBasicAuth(c.config.Username, c.config.APIToken) + } + case "basic_auth", "": + if c.config.Username != "" && c.config.Password != "" { + req.SetBasicAuth(c.config.Username, c.config.Password) + } + default: + log.Warnf("[jira client] unsupported auth type: %s", c.config.AuthType) + } +} + +func (c *JiraClient) checkHTTPStatus(statusCode int, body []byte) error { + switch statusCode { + case http.StatusOK: + return nil + case http.StatusUnauthorized: + return &JiraError{ + StatusCode: statusCode, + Message: "authentication failed - check your credentials", + Retryable: false, + } + case http.StatusForbidden: + return &JiraError{ + StatusCode: statusCode, + Message: "access forbidden - check your permissions", + Retryable: false, + } + case http.StatusNotFound: + return &JiraError{ + StatusCode: statusCode, + Message: "resource not found", + Retryable: false, + } + case http.StatusTooManyRequests: + // 解析 Retry-After 头部 + return &JiraError{ + StatusCode: statusCode, + Message: "rate limit exceeded", + Retryable: true, + } + case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + return &JiraError{ + StatusCode: statusCode, + Message: fmt.Sprintf("server error: %d", statusCode), + Retryable: true, + } + default: + message := fmt.Sprintf("unexpected status code: %d", statusCode) + if len(body) > 0 && len(body) < 1000 { + message += fmt.Sprintf(", body: %s", string(body)) + } + return &JiraError{ + StatusCode: statusCode, + Message: message, + Retryable: statusCode >= 500, + } + } +} + +func (c *JiraClient) shouldRetry(err error) bool { + if jiraErr, ok := err.(*JiraError); ok { + return jiraErr.Retryable + } + + // 网络错误通常可以重试 + if util.IsNetworkError(err) { + return true + } + + return false +} + +// JiraError 自定义错误类型 +type JiraError struct { + StatusCode int + Message string + Retryable bool +} + +func (e *JiraError) Error() string { + return fmt.Sprintf("jira api error (status %d): %s", e.StatusCode, e.Message) +} diff --git a/plugins/connectors/jira/plugin.go b/plugins/connectors/jira/plugin.go new file mode 100644 index 00000000..198b89bc --- /dev/null +++ b/plugins/connectors/jira/plugin.go @@ -0,0 +1,334 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + + package jira + + import ( + "context" + "fmt" + "sync" + "time" + + log "github.com/cihub/seelog" + "infini.sh/coco/modules/common" + "infini.sh/coco/plugins/connectors" + "infini.sh/framework/core/global" + "infini.sh/framework/core/module" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" + ) + + const ( + ConnectorJira = "jira" + DefaultPageSize = 50 + MaxRetries = 3 + SyncInterval = time.Minute * 30 + ) + + func init() { + module.RegisterUserPlugin(&Plugin{}) + } + + type Plugin struct { + connectors.BasePlugin + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + } + + func (p *Plugin) Name() string { + return ConnectorJira + } + + func (p *Plugin) Setup() { + p.BasePlugin.Init("connector.jira", "indexing jira issues", p) + } + + func (p *Plugin) Start() error { + p.mu.Lock() + defer p.mu.Unlock() + p.ctx, p.cancel = context.WithCancel(context.Background()) + return p.BasePlugin.Start(SyncInterval) + } + + func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.cancel != nil { + log.Infof("[jira connector] received stop signal, cancelling current scan") + p.cancel() + p.ctx = nil + p.cancel = nil + } + return nil + } + + func (p *Plugin) Scan(connector *common.Connector, datasource *common.DataSource) { + p.mu.Lock() + parentCtx := p.ctx + p.mu.Unlock() + + if parentCtx == nil { + _ = log.Warnf("[jira connector] plugin is stopped, skipping scan for datasource [%s]", datasource.Name) + return + } + + cfg := Config{} + err := connectors.ParseConnectorConfigure(connector, datasource, &cfg) + if err != nil { + _ = log.Errorf("[jira connector] parsing connector configuration failed: %v", err) + return + } + + log.Debugf("[jira connector] handling datasource: %v", cfg) + + if cfg.BaseURL == "" { + _ = log.Errorf("[jira connector] missing required configuration for datasource [%s]: base_url", datasource.Name) + return + } + + client, err := NewJiraClient(&cfg) + if err != nil { + _ = log.Errorf("[jira connector] failed to init Jira client for datasource [%s]: %v", datasource.Name, err) + return + } + + scanCtx, scanCancel := context.WithCancel(parentCtx) + defer scanCancel() + + p.scanIssues(scanCtx, client, connector, datasource, &cfg) + } + + func (p *Plugin) scanIssues(ctx context.Context, client *JiraClient, connector *common.Connector, datasource *common.DataSource, cfg *Config) { + jql := p.buildJQL(cfg, datasource) + log.Debugf("[jira connector] using JQL: %s", jql) + + startAt := 0 + maxResults := cfg.MaxResults + if maxResults <= 0 { + maxResults = DefaultPageSize + } + + for { + select { + case <-ctx.Done(): + log.Debugf("[jira connector] context cancelled, stopping scan") + return + default: + } + + if global.ShuttingDown() { + log.Infof("[jira connector] system is shutting down, stopping scan") + return + } + + searchResult, err := client.SearchIssues(ctx, jql, startAt, maxResults) + if err != nil { + _ = log.Errorf("[jira connector] failed to search issues: %v", err) + return + } + + if len(searchResult.Issues) == 0 { + break + } + + for _, issue := range searchResult.Issues { + select { + case <-ctx.Done(): + return + default: + } + + if global.ShuttingDown() { + return + } + + doc, err := p.transformIssueToDocument(&issue, datasource, cfg) + if err != nil { + _ = log.Errorf("[jira connector] failed to transform issue %s: %v", issue.Key, err) + continue + } + + // 获取评论(如果配置启用) + if cfg.IncludeComments { + comments, err := client.GetIssueComments(ctx, issue.Key) + if err != nil { + _ = log.Warnf("[jira connector] failed to get comments for issue %s: %v", issue.Key, err) + } else { + p.processComments(comments, datasource, cfg, connector) + } + } + + data := util.MustToJSONBytes(doc) + if err := queue.Push(p.Queue, data); err != nil { + _ = log.Errorf("[jira connector] failed to push document to queue: %v", err) + continue + } + } + + log.Infof("[jira connector] processed %d issues (startAt: %d)", len(searchResult.Issues), startAt) + + if startAt+maxResults >= searchResult.Total { + break + } + startAt += maxResults + } + } + + func (p *Plugin) buildJQL(cfg *Config, datasource *common.DataSource) string { + jql := "" + + // 项目过滤 + if len(cfg.Projects) > 0 { + projectList := "" + for i, project := range cfg.Projects { + if i > 0 { + projectList += ", " + } + projectList += fmt.Sprintf("\"%s\"", project) + } + jql += fmt.Sprintf("project in (%s)", projectList) + } + + // 问题类型过滤 + if len(cfg.IssueTypes) > 0 { + if jql != "" { + jql += " AND " + } + typeList := "" + for i, issueType := range cfg.IssueTypes { + if i > 0 { + typeList += ", " + } + typeList += fmt.Sprintf("\"%s\"", issueType) + } + jql += fmt.Sprintf("issuetype in (%s)", typeList) + } + + // 增量同步 + lastSync, err := connectors.GetLastSyncValue(datasource.ID) + if err == nil && lastSync != "" { + if jql != "" { + jql += " AND " + } + jql += fmt.Sprintf("updated >= \"%s\"", lastSync) + } + + // 自定义 JQL 过滤器 + if cfg.JQLFilter != "" { + if jql != "" { + jql += " AND " + } + jql += fmt.Sprintf("(%s)", cfg.JQLFilter) + } + + // 默认排序 + if jql != "" { + jql += " ORDER BY updated DESC" + } else { + jql = "ORDER BY updated DESC" + } + + return jql + } + + func (p *Plugin) transformIssueToDocument(issue *Issue, datasource *common.DataSource, cfg *Config) (*common.Document, error) { + doc := &common.Document{ + Source: common.DataSourceReference{ + ID: datasource.ID, + Type: "connector", + Name: datasource.Name, + }, + Type: ConnectorJira, + Icon: "jira", + System: datasource.System, + } + + doc.ID = util.MD5digest(fmt.Sprintf("%s-%s", datasource.ID, issue.Key)) + doc.Title = issue.Fields.Summary + doc.Content = issue.Fields.Description + doc.URL = fmt.Sprintf("%s/browse/%s", cfg.BaseURL, issue.Key) + + if issue.Fields.Project != nil { + doc.Category = issue.Fields.Project.Name + } + + if issue.Fields.Created != nil { + doc.Created = issue.Fields.Created + } + if issue.Fields.Updated != nil { + doc.Updated = issue.Fields.Updated + } + + if issue.Fields.Reporter != nil { + doc.Owner = &common.UserInfo{ + UserName: issue.Fields.Reporter.DisplayName, + UserID: issue.Fields.Reporter.AccountID, + } + } + + // 添加 Jira 特有的元数据 + doc.Metadata = map[string]interface{}{ + "jira_key": issue.Key, + "issue_type": issue.Fields.IssueType.Name, + "status": issue.Fields.Status.Name, + "priority": issue.Fields.Priority.Name, + "labels": issue.Fields.Labels, + "components": issue.Fields.Components, + } + + if issue.Fields.Assignee != nil { + doc.Metadata["assignee"] = map[string]interface{}{ + "display_name": issue.Fields.Assignee.DisplayName, + "account_id": issue.Fields.Assignee.AccountID, + } + } + + return doc, nil + } + + func (p *Plugin) processComments(comments *CommentsResponse, datasource *common.DataSource, cfg *Config, connector *common.Connector) { + for _, comment := range comments.Comments { + doc := &common.Document{ + Source: common.DataSourceReference{ + ID: datasource.ID, + Type: "connector", + Name: datasource.Name, + }, + Type: "jira_comment", + Icon: "comment", + System: datasource.System, + Title: fmt.Sprintf("Comment on %s", comment.IssueKey), + Content: comment.Body, + URL: fmt.Sprintf("%s/browse/%s?focusedCommentId=%s", cfg.BaseURL, comment.IssueKey, comment.ID), + } + + doc.ID = util.MD5digest(fmt.Sprintf("%s-comment-%s", datasource.ID, comment.ID)) + + if comment.Created != nil { + doc.Created = comment.Created + } + if comment.Updated != nil { + doc.Updated = comment.Updated + } + + if comment.Author != nil { + doc.Owner = &common.UserInfo{ + UserName: comment.Author.DisplayName, + UserID: comment.Author.AccountID, + } + } + + doc.Metadata = map[string]interface{}{ + "comment_id": comment.ID, + "issue_key": comment.IssueKey, + } + + data := util.MustToJSONBytes(doc) + if err := queue.Push(p.Queue, data); err != nil { + _ = log.Errorf("[jira connector] failed to push comment to queue: %v", err) + } + } + } \ No newline at end of file diff --git a/plugins/connectors/jira/retry.go b/plugins/connectors/jira/retry.go new file mode 100644 index 00000000..ba0ef967 --- /dev/null +++ b/plugins/connectors/jira/retry.go @@ -0,0 +1,109 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + + package jira + + import ( + "context" + "fmt" + "math" + "time" + + log "github.com/cihub/seelog" + ) + + // RetryConfig 重试配置 + type RetryConfig struct { + MaxRetries int + InitialInterval time.Duration + MaxInterval time.Duration + Multiplier float64 + } + + // DefaultRetryConfig 默认重试配置 + func DefaultRetryConfig() *RetryConfig { + return &RetryConfig{ + MaxRetries: 3, + InitialInterval: time.Second, + MaxInterval: 30 * time.Second, + Multiplier: 2.0, + } + } + + // RetryableFunc 可重试的函数类型 + type RetryableFunc func() error + + // WithRetry 执行带重试的函数 + func WithRetry(ctx context.Context, config *RetryConfig, fn RetryableFunc) error { + if config == nil { + config = DefaultRetryConfig() + } + + var lastErr error + interval := config.InitialInterval + + for attempt := 0; attempt <= config.MaxRetries; attempt++ { + if attempt > 0 { + log.Debugf("[jira retry] attempt %d/%d after %v", attempt, config.MaxRetries, interval) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(interval): + } + + // 计算下次重试间隔(指数退避) + interval = time.Duration(float64(interval) * config.Multiplier) + if interval > config.MaxInterval { + interval = config.MaxInterval + } + } + + err := fn() + if err == nil { + if attempt > 0 { + log.Infof("[jira retry] succeeded after %d attempts", attempt+1) + } + return nil + } + + lastErr = err + + // 检查是否应该重试 + if !shouldRetryError(err) { + log.Debugf("[jira retry] error is not retryable: %v", err) + break + } + + if attempt < config.MaxRetries { + log.Warnf("[jira retry] attempt %d failed, will retry: %v", attempt+1, err) + } + } + + return fmt.Errorf("operation failed after %d attempts: %w", config.MaxRetries+1, lastErr) + } + + // shouldRetryError 判断错误是否可重试 + func shouldRetryError(err error) bool { + if jiraErr, ok := err.(*JiraError); ok { + return jiraErr.Retryable + } + + // 其他类型的错误,如网络错误等 + return true + } + + // ExponentialBackoff 计算指数退避时间 + func ExponentialBackoff(attempt int, baseInterval time.Duration, maxInterval time.Duration) time.Duration { + if attempt <= 0 { + return baseInterval + } + + backoff := time.Duration(float64(baseInterval) * math.Pow(2, float64(attempt-1))) + if backoff > maxInterval { + backoff = maxInterval + } + + return backoff + } \ No newline at end of file diff --git a/plugins/connectors/jira/types.go b/plugins/connectors/jira/types.go new file mode 100644 index 00000000..1f486bdb --- /dev/null +++ b/plugins/connectors/jira/types.go @@ -0,0 +1,137 @@ +package jira + +import "time" + +// Config 定义 Jira 连接器配置 +type Config struct { + BaseURL string `config:"base_url" json:"base_url"` + AuthType string `config:"auth_type" json:"auth_type"` // basic_auth, api_token, oauth + Username string `config:"username" json:"username"` + Password string `config:"password" json:"password"` + APIToken string `config:"api_token" json:"api_token"` + Projects []string `config:"projects" json:"projects"` + IssueTypes []string `config:"issue_types" json:"issue_types"` + Fields []string `config:"fields" json:"fields"` + IncludeComments bool `config:"include_comments" json:"include_comments"` + IncludeAttachments bool `config:"include_attachments" json:"include_attachments"` + MaxResults int `config:"max_results" json:"max_results"` + JQLFilter string `config:"jql_filter" json:"jql_filter"` +} + +// SearchResult Jira 搜索结果 +type SearchResult struct { + Expand string `json:"expand"` + StartAt int `json:"startAt"` + MaxResults int `json:"maxResults"` + Total int `json:"total"` + Issues []Issue `json:"issues"` +} + +// Issue Jira 问题 +type Issue struct { + Expand string `json:"expand"` + ID string `json:"id"` + Self string `json:"self"` + Key string `json:"key"` + Fields IssueFields `json:"fields"` +} + +// IssueFields 问题字段 +type IssueFields struct { + Summary string `json:"summary"` + Description string `json:"description"` + IssueType *IssueType `json:"issuetype"` + Project *Project `json:"project"` + Status *Status `json:"status"` + Priority *Priority `json:"priority"` + Reporter *User `json:"reporter"` + Assignee *User `json:"assignee"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Labels []string `json:"labels"` + Components []Component `json:"components"` +} + +// IssueType 问题类型 +type IssueType struct { + Self string `json:"self"` + ID string `json:"id"` + Description string `json:"description"` + IconURL string `json:"iconUrl"` + Name string `json:"name"` + Subtask bool `json:"subtask"` +} + +// Project 项目 +type Project struct { + Self string `json:"self"` + ID string `json:"id"` + Key string `json:"key"` + Name string `json:"name"` + ProjectTypeKey string `json:"projectTypeKey"` +} + +// Status 状态 +type Status struct { + Self string `json:"self"` + Description string `json:"description"` + IconURL string `json:"iconUrl"` + Name string `json:"name"` + ID string `json:"id"` + StatusCategory StatusCategory `json:"statusCategory"` +} + +// StatusCategory 状态分类 +type StatusCategory struct { + Self string `json:"self"` + ID int `json:"id"` + Key string `json:"key"` + ColorName string `json:"colorName"` + Name string `json:"name"` +} + +// Priority 优先级 +type Priority struct { + Self string `json:"self"` + IconURL string `json:"iconUrl"` + Name string `json:"name"` + ID string `json:"id"` +} + +// User 用户 +type User struct { + Self string `json:"self"` + AccountID string `json:"accountId"` + DisplayName string `json:"displayName"` + EmailAddress string `json:"emailAddress"` + Active bool `json:"active"` + TimeZone string `json:"timeZone"` + AccountType string `json:"accountType"` +} + +// Component 组件 +type Component struct { + Self string `json:"self"` + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` +} + +// CommentsResponse 评论响应 +type CommentsResponse struct { + StartAt int `json:"startAt"` + MaxResults int `json:"maxResults"` + Total int `json:"total"` + Comments []Comment `json:"comments"` +} + +// Comment 评论 +type Comment struct { + Self string `json:"self"` + ID string `json:"id"` + Author *User `json:"author"` + Body string `json:"body"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + IssueKey string `json:"-"` // 手动设置,用于关联问题 +} \ No newline at end of file