Skip to content

Conversation

futeng
Copy link

@futeng futeng commented Sep 7, 2025

Description

Add Apache Pulsar support as a new EventBus implementation for coze-studio, providing users with a powerful, enterprise-grade message queue option that enhances coze-studio's scalability and security capabilities.

Why Pulsar?

As coze-studio grows and handles larger data volumes, the platform requires a message queue solution that can scale seamlessly while maintaining strong consistency guarantees. Apache Pulsar offers:

  • Superior Message Ordering: Exclusive subscription mode ensures strict message ordering, critical for maintaining data consistency in complex workflows
  • High Throughput & Low Latency: Designed for cloud-native environments with exceptional performance characteristics
  • Enhanced Security: Built-in JWT authentication provides enterprise-grade security for message queue communications
  • Future-Ready Architecture: Multi-tenant, geo-replication capabilities that can support coze-studio's growth trajectory
  • Cloud-Native Design: Perfect alignment with coze-studio's containerized deployment model

Changes

  • Add Pulsar producer and consumer implementations
  • Integrate Pulsar into EventBus factory with COZE_MQ_TYPE=pulsar
  • Support JWT authentication for secure Pulsar connections
  • Add comprehensive Docker Compose configurations
  • Maintain full compatibility with existing EventBus interfaces
  • Use Exclusive subscription mode for better message ordering
  • Add Docker Compose configurations for Pulsar deployment

Configuration

Enable Pulsar EventBus

export COZE_MQ_TYPE="pulsar"
export MQ_NAME_SERVER="pulsar:6650"
export PULSAR_SERVICE_URL="pulsar://pulsar:6650"
export PULSAR_JWT_TOKEN="" # Optional JWT token for authentication

Testing

  • Unit tests for Pulsar implementation
  • Integration tests with Docker Compose
  • Verified compatibility with existing EventBus interfaces
  • Performance testing with message ordering validation

Breaking Changes

None. This is a purely additive feature that maintains full backward compatibility.

What type of PR is this?

feat: A new feature

Check the PR title.

  • This PR title match the format: <type>(optional scope): <description>
  • The description of this PR title is user-oriented and clear enough for others to understand.
  • Add documentation if the current PR requires user awareness at the usage level.

(Optional) Translate the PR title into Chinese.

feat: 添加 Apache Pulsar EventBus 实现

(Optional) More detailed description for this PR(en: English/zh: Chinese).

en: This PR introduces Apache Pulsar as a new message queue backend for coze-studio's EventBus system. Users can now configure COZE_MQ_TYPE=pulsar to use Pulsar instead of NSQ, Kafka, or RocketMQ. The implementation includes JWT authentication support, Docker Compose configurations for easy deployment, and maintains full compatibility with existing EventBus interfaces. The Exclusive subscription mode ensures better message ordering for event processing, making it ideal for production environments requiring high throughput and strong consistency guarantees.

zh(optional): 此 PR 为 coze-studio 的 EventBus 系统引入了 Apache Pulsar 作为新的消息队列后端。用户现在可以通过配置 COZE_MQ_TYPE=pulsar 来使用 Pulsar 替代 NSQ、Kafka 或 RocketMQ。该实现包含 JWT 认证支持、便于部署的 Docker Compose 配置,并与现有 EventBus 接口完全兼容。独占订阅模式确保了事件处理的更好消息顺序,非常适合需要高吞吐量和强一致性保证的生产环境。

(Optional) Which issue(s) this PR fixes:

N/A - This is a new feature enhancement

- Add Pulsar producer and consumer implementations
- Integrate Pulsar support into main EventBus factory
- Add configuration constants for Pulsar service URL and JWT token
- Update environment configuration to include Pulsar option
- Add comprehensive tests for Pulsar implementation
- Support all existing EventBus interfaces and options
- Use Exclusive subscription mode for better message ordering
- Add Docker Compose configurations for Pulsar deployment

This implementation allows coze-studio to use Apache Pulsar as a message queue
alongside the existing NSQ, Kafka, and RocketMQ options.
- Add Pulsar support to .env.debug.example
- Include PULSAR_SERVICE_URL and PULSAR_JWT_TOKEN configuration
- Update COZE_MQ_TYPE comment to include pulsar option
- Maintain consistency with main .env.example configuration
@CLAassistant
Copy link

CLAassistant commented Sep 7, 2025

CLA assistant check
All committers have signed the CLA.

}

// Create Pulsar client
fmt.Printf("[DEBUG] Creating Pulsar client with URL: %s\n", serviceURL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logs

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK,logs 我整体调整下

@@ -0,0 +1,389 @@
name: coze-studio
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么加两个 docker-compose 文件?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fanlv 您好,

  • docker/docker-compose-external-pulsar.yml 这个启动的时候时候不包含 Pulsar 服务,主要用来模拟采用外部 Pulsar 作为 eventbus的场景。
  • docker/docker-compose-pulsar.ym 这个是默认启动一个 Pulsar 服务,方便直接使用的场景。

希望都能保留着啊。如果分优先级的话,希望优先保留docker/docker-compose-external-pulsar.yml,这个可以一键启动使用包含 Pulsar 作为 backend eventbus 版本的 coze

…Replace all fmt.Printf/fmt.Println with logs.Debugf/logs.Errorf in producer.go- Use t.Logf/t.Errorf in test files instead of fmt.Println- Maintain consistency with default EventBus implementations (NSQ style)- Ensure proper logging practices across the codebase
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants