Skip to content

Commit a11a611

Browse files
authored
support connection pool (#672)
* support connection pool * more specs * add changelog * per instance config ability * more specs --------- Signed-off-by: Maciej Mensfeld <[email protected]>
1 parent e1b4be1 commit a11a611

File tree

5 files changed

+1245
-0
lines changed

5 files changed

+1245
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# WaterDrop changelog
22

33
## 2.8.8 (Unreleased)
4+
- [Feature] Add `WaterDrop::ConnectionPool` for efficient connection pooling using the proven `connection_pool` gem.
45
- [Change] Remove Ruby `3.1` specs according to the EOL schedule.
56

67
## 2.8.7 (2025-09-02)

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ group :development do
1414
end
1515

1616
group :test do
17+
gem 'connection_pool'
1718
gem 'ostruct'
1819
gem 'rspec'
1920
gem 'simplecov'

Gemfile.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ GEM
1010
remote: https://rubygems.org/
1111
specs:
1212
byebug (12.0.0)
13+
connection_pool (2.5.3)
1314
diff-lcs (1.6.2)
1415
docile (1.4.1)
1516
ffi (1.17.2)
@@ -98,6 +99,7 @@ PLATFORMS
9899

99100
DEPENDENCIES
100101
byebug
102+
connection_pool
101103
ostruct
102104
rspec
103105
simplecov

lib/waterdrop/connection_pool.rb

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
# frozen_string_literal: true
2+
3+
# WaterDrop library
4+
module WaterDrop
5+
# Connection pool wrapper for WaterDrop producers using the proven connection_pool gem.
6+
#
7+
# This provides a clean WaterDrop-specific API while leveraging the battle-tested,
8+
# connection_pool gem underneath. The wrapper hides the direct usage of the connection_pool
9+
# gem and provides WaterDrop-specific configuration.
10+
#
11+
# @example Basic usage
12+
# pool = WaterDrop::ConnectionPool.new(size: 10) do |config|
13+
# config.kafka = { 'bootstrap.servers': 'localhost:9092' }
14+
# config.deliver = true
15+
# end
16+
#
17+
# pool.with do |producer|
18+
# producer.produce_sync(topic: 'events', payload: 'data')
19+
# end
20+
#
21+
# @example Transactional producers with unique IDs
22+
# pool = WaterDrop::ConnectionPool.new(size: 5) do |config, index|
23+
# config.kafka = {
24+
# 'bootstrap.servers': 'localhost:9092',
25+
# 'transactional.id': "my-app-#{index}"
26+
# }
27+
# end
28+
#
29+
# @example Global connection pool
30+
# WaterDrop::ConnectionPool.setup(size: 20) do |config|
31+
# config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
32+
# end
33+
#
34+
# WaterDrop::ConnectionPool.with do |producer|
35+
# producer.produce_async(topic: 'events', payload: 'data')
36+
# end
37+
class ConnectionPool
38+
# Delegate key methods to underlying connection pool
39+
extend Forwardable
40+
41+
def_delegators :@pool, :with, :size, :available
42+
43+
class << self
44+
# Global connection pool instance
45+
attr_accessor :default_pool
46+
47+
# Sets up a global connection pool
48+
#
49+
# @param size [Integer] Pool size (default: 5)
50+
# @param timeout [Numeric] Connection timeout in seconds (default: 5)
51+
# @param producer_config [Proc] Block to configure each producer in the pool
52+
# @yield [config, index] Block to configure each producer in the pool, receives config and
53+
# pool index
54+
# @return [ConnectionPool] The configured global pool
55+
#
56+
# @example Basic setup
57+
# WaterDrop::ConnectionPool.setup(size: 15) do |config|
58+
# config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
59+
# config.deliver = true
60+
# end
61+
#
62+
# @example Transactional setup with unique IDs
63+
# WaterDrop::ConnectionPool.setup(size: 5) do |config, index|
64+
# config.kafka = {
65+
# 'bootstrap.servers': ENV['KAFKA_BROKERS'],
66+
# 'transactional.id': "my-app-#{index}"
67+
# }
68+
# end
69+
def setup(size: 5, timeout: 5, &producer_config)
70+
ensure_connection_pool_gem!
71+
72+
@default_pool = new(size: size, timeout: timeout, &producer_config)
73+
end
74+
75+
# Executes a block with a producer from the global pool
76+
#
77+
# @param block [Proc] Block to execute with a producer
78+
# @yield [producer] Producer from the global pool
79+
# @return [Object] Result of the block
80+
# @raise [RuntimeError] If no global pool is configured
81+
#
82+
# @example
83+
# WaterDrop::ConnectionPool.with do |producer|
84+
# producer.produce_sync(topic: 'events', payload: 'data')
85+
# end
86+
def with(&block)
87+
raise 'No global connection pool configured. Call setup first.' unless @default_pool
88+
89+
@default_pool.with(&block)
90+
end
91+
92+
# Get statistics about the global pool
93+
#
94+
# @return [Hash, nil] Pool statistics or nil if no global pool
95+
def stats
96+
return nil unless @default_pool
97+
98+
{
99+
size: @default_pool.size,
100+
available: @default_pool.available
101+
}
102+
end
103+
104+
# Shutdown the global connection pool
105+
def shutdown
106+
return unless @default_pool
107+
108+
@default_pool.shutdown
109+
@default_pool = nil
110+
end
111+
112+
# Reload the global connection pool
113+
def reload
114+
@default_pool&.reload
115+
end
116+
117+
# Check if the global connection pool is active (configured)
118+
#
119+
# @return [Boolean] true if global pool is configured, false otherwise
120+
def active?
121+
!@default_pool.nil?
122+
end
123+
124+
private
125+
126+
# Ensures the connection_pool gem is available (class method)
127+
# Only requires it when actually needed (lazy loading)
128+
def ensure_connection_pool_gem!
129+
return if defined?(::ConnectionPool)
130+
131+
require 'connection_pool'
132+
rescue LoadError
133+
raise LoadError, <<~ERROR
134+
WaterDrop::ConnectionPool requires the 'connection_pool' gem.
135+
136+
Add this to your Gemfile:
137+
gem 'connection_pool'
138+
139+
Then run:
140+
bundle install
141+
ERROR
142+
end
143+
end
144+
145+
# Creates a new WaterDrop connection pool
146+
#
147+
# @param size [Integer] Pool size (default: 5)
148+
# @param timeout [Numeric] Connection timeout in seconds (default: 5)
149+
# @param producer_config [Proc] Block to configure each producer in the pool
150+
# @yield [config, index] Block to configure each producer in the pool, receives config and
151+
# pool index
152+
def initialize(size: 5, timeout: 5, &producer_config)
153+
self.class.send(:ensure_connection_pool_gem!)
154+
155+
@producer_config = producer_config
156+
@pool_index = 0
157+
@pool_mutex = Mutex.new
158+
159+
@pool = ::ConnectionPool.new(size: size, timeout: timeout) do
160+
producer_index = @pool_mutex.synchronize { @pool_index += 1 }
161+
162+
WaterDrop::Producer.new do |config|
163+
if @producer_config.arity == 2
164+
@producer_config.call(config, producer_index)
165+
else
166+
@producer_config.call(config)
167+
end
168+
end
169+
end
170+
end
171+
172+
# Get pool statistics
173+
#
174+
# @return [Hash] Pool statistics
175+
def stats
176+
{
177+
size: @pool.size,
178+
available: @pool.available
179+
}
180+
end
181+
182+
# Shutdown the connection pool
183+
def shutdown
184+
@pool.shutdown do |producer|
185+
producer.close! if producer&.status&.active?
186+
end
187+
end
188+
189+
# Reload all connections in the pool
190+
# Useful for configuration changes or error recovery
191+
def reload
192+
@pool.reload do |producer|
193+
producer.close! if producer&.status&.active?
194+
end
195+
end
196+
197+
# Returns the underlying connection_pool instance
198+
# This allows access to advanced connection_pool features if needed
199+
#
200+
# @return [::ConnectionPool] The underlying connection pool
201+
attr_reader :pool
202+
end
203+
204+
# Convenience methods on the WaterDrop module for global pool access
205+
class << self
206+
# Execute a block with a producer from the global connection pool
207+
# Only available when connection pool is configured
208+
#
209+
# @param block [Proc] Block to execute with a producer
210+
# @yield [producer] Producer from the global pool
211+
# @return [Object] Result of the block
212+
#
213+
# @example
214+
# WaterDrop.with do |producer|
215+
# producer.produce_sync(topic: 'events', payload: 'data')
216+
# end
217+
def with(&block)
218+
ConnectionPool.with(&block)
219+
end
220+
221+
# Access the global connection pool
222+
#
223+
# @return [WaterDrop::ConnectionPool] The global pool
224+
#
225+
# @example
226+
# WaterDrop.pool.with do |producer|
227+
# producer.produce_async(topic: 'events', payload: 'data')
228+
# end
229+
def pool
230+
ConnectionPool.default_pool
231+
end
232+
end
233+
end

0 commit comments

Comments
 (0)