Skip to content

Commit c780849

Browse files
committed
ensure request completion in lease keep alive to prevent orphaned threads
1 parent 5e51565 commit c780849

3 files changed

Lines changed: 37 additions & 21 deletions

File tree

etcdv3.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ Gem::Specification.new do |s|
2020
end
2121
s.add_dependency("grpc", "~> 1.6")
2222
s.add_development_dependency("rspec", "~> 3.6.0")
23+
s.add_development_dependency("pry")
2324
end

lib/etcdv3/lease.rb

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
class Etcdv3
22
class Lease
3-
class Enum < Array
4-
def proceed
3+
4+
# this is used for gRPC proxy compatibility so that we do not
5+
# mark as finished writing until we've received a response
6+
class BlockingRequest
7+
def initialize(request_op)
8+
@blocked = false
9+
@request_op = request_op
10+
end
11+
12+
def read_done!
513
@proceed = true
614
end
715

8-
def completed
9-
@completed
16+
def blocked?
17+
@blocked
1018
end
1119

1220
def each
13-
@completed = false
14-
i = 0
15-
while i < self.size do
16-
@proceed = false
17-
yield self[i]
18-
i+=1
19-
sleep 0.001 until @proceed == true
20-
end
21-
@completed = true
21+
@blocked = true
22+
23+
yield @request_op
24+
25+
sleep 0.001 until @proceed == true
26+
@blocked = false
2227
end
2328
end
2429

@@ -44,16 +49,20 @@ def lease_ttl(id, timeout: nil)
4449
end
4550

4651
def lease_keep_alive_once(id, timeout: nil)
47-
enum = Enum.new
48-
enum << Etcdserverpb::LeaseKeepAliveRequest.new(ID: id)
52+
request = BlockingRequest.new Etcdserverpb::LeaseKeepAliveRequest.new(ID: id)
4953
response = nil
50-
@stub.lease_keep_alive(enum, metadata: @metadata, deadline: deadline(timeout)).each do |resp|
51-
response = resp
52-
break;
54+
begin
55+
@stub.lease_keep_alive(request, metadata: @metadata, deadline: deadline(timeout)).each do |resp|
56+
response = resp
57+
break;
58+
end
59+
ensure
60+
request.read_done!
61+
while request.blocked?
62+
sleep 0.001
63+
end
5364
end
54-
enum.proceed
55-
sleep 0.001 until enum.completed
56-
response
65+
return response
5766
end
5867

5968
private

spec/etcdv3/lease_spec.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@
4141
stub = local_stub(Etcdv3::Lease, -1)
4242
expect { stub.lease_keep_alive_once(id) }.to raise_error(GRPC::DeadlineExceeded)
4343
end
44+
45+
it "doesn't orphan threads if there is a server error" do
46+
expect_any_instance_of(GRPC::BidiCall).to receive(:read_loop).and_raise(GRPC::DeadlineExceeded)
47+
stub = local_stub(Etcdv3::Lease, 2)
48+
expect { stub.lease_keep_alive_once(314159) rescue nil; sleep 0.5}.to_not change { Thread.list.size }
49+
end
4450
end
4551

4652
describe '#lease_ttl' do

0 commit comments

Comments
 (0)