Skip to content

Commit 6ecc00e

Browse files
committed
Make channel manager closing idempotent
And improve synchronization on channel termination. Fixes #1725 (cherry picked from commit 8436e95)
1 parent 2698cf4 commit 6ecc00e

File tree

2 files changed

+44
-49
lines changed

2 files changed

+44
-49
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,9 @@ private ShutdownSignalException startShutdown(Method reason,
10281028

10291029
private void finishShutdown(ShutdownSignalException sse) {
10301030
ChannelManager cm = _channelManager;
1031-
if (cm != null) cm.handleSignal(sse);
1031+
if (cm != null) {
1032+
cm.handleSignal(sse);
1033+
}
10321034
}
10331035

10341036
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 41 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map;
3131
import java.util.Set;
3232
import java.util.concurrent.*;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334

3435
/**
3536
* Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
@@ -38,16 +39,15 @@ public class ChannelManager {
3839

3940
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
4041

42+
private final AtomicBoolean closed = new AtomicBoolean(false);
4143
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
4244
private final Object monitor = new Object();
4345
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
44-
private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();
46+
private final Map<Integer, ChannelN> _channelMap = new HashMap<>();
4547
private final IntAllocator channelNumberAllocator;
4648

4749
private final ConsumerWorkService workService;
4850

49-
private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();
50-
5151
/** Maximum channel number available on this connection. */
5252
private final int _channelMax;
5353
private ExecutorService shutdownExecutor;
@@ -109,61 +109,54 @@ public ChannelN getChannel(int channelNumber) {
109109
* @param signal reason for shutdown
110110
*/
111111
public void handleSignal(final ShutdownSignalException signal) {
112-
Set<ChannelN> channels;
113-
synchronized(this.monitor) {
114-
channels = new HashSet<ChannelN>(_channelMap.values());
115-
}
116-
117-
for (final ChannelN channel : channels) {
118-
releaseChannelNumber(channel);
119-
// async shutdown if possible
120-
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
121-
Runnable channelShutdownRunnable = new Runnable() {
122-
@Override
123-
public void run() {
124-
channel.processShutdownSignal(signal, true, true);
125-
}
126-
};
127-
if(this.shutdownExecutor == null) {
128-
channelShutdownRunnable.run();
129-
} else {
130-
Future<?> channelShutdownTask = this.shutdownExecutor.submit(channelShutdownRunnable);
131-
try {
132-
channelShutdownTask.get(channelShutdownTimeout, TimeUnit.MILLISECONDS);
133-
} catch (Exception e) {
134-
LOGGER.warn("Couldn't properly close channel {} on shutdown after waiting for {} ms", channel.getChannelNumber(), channelShutdownTimeout);
135-
channelShutdownTask.cancel(true);
112+
if (this.closed.compareAndSet(false, true)) {
113+
Set<ChannelN> channels;
114+
synchronized(this.monitor) {
115+
channels = new HashSet<>(_channelMap.values());
116+
}
117+
Set<CountDownLatch> shutdownSet = new HashSet<>();
118+
for (final ChannelN channel : channels) {
119+
releaseChannelNumber(channel);
120+
// async shutdown if possible
121+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
122+
Runnable channelShutdownRunnable = () -> channel.processShutdownSignal(signal, true, true);
123+
if(this.shutdownExecutor == null) {
124+
channelShutdownRunnable.run();
125+
} else {
126+
Future<?> channelShutdownTask = this.shutdownExecutor.submit(channelShutdownRunnable);
127+
try {
128+
channelShutdownTask.get(channelShutdownTimeout, TimeUnit.MILLISECONDS);
129+
} catch (Exception e) {
130+
LOGGER.warn("Couldn't properly close channel {} on shutdown after waiting for {} ms", channel.getChannelNumber(), channelShutdownTimeout);
131+
channelShutdownTask.cancel(true);
132+
}
136133
}
134+
shutdownSet.add(channel.getShutdownLatch());
135+
channel.notifyListeners();
137136
}
138-
shutdownSet.add(channel.getShutdownLatch());
139-
channel.notifyListeners();
137+
scheduleShutdownProcessing(shutdownSet);
140138
}
141-
scheduleShutdownProcessing();
142139
}
143140

144-
private void scheduleShutdownProcessing() {
145-
final Set<CountDownLatch> sdSet = new HashSet<CountDownLatch>(shutdownSet);
141+
private void scheduleShutdownProcessing(Set<CountDownLatch> shutdownSet) {
146142
final ConsumerWorkService ssWorkService = workService;
147-
Runnable target = new Runnable() {
148-
@Override
149-
public void run() {
150-
for (CountDownLatch latch : sdSet) {
151-
try {
152-
int shutdownTimeout = ssWorkService.getShutdownTimeout();
153-
if (shutdownTimeout == 0) {
154-
latch.await();
155-
} else {
156-
boolean completed = latch.await(shutdownTimeout, TimeUnit.MILLISECONDS);
157-
if (!completed) {
158-
LOGGER.warn("Consumer dispatcher for channel didn't shutdown after waiting for {} ms", shutdownTimeout);
159-
}
143+
Runnable target = () -> {
144+
for (CountDownLatch latch : shutdownSet) {
145+
try {
146+
int shutdownTimeout = ssWorkService.getShutdownTimeout();
147+
if (shutdownTimeout == 0) {
148+
latch.await();
149+
} else {
150+
boolean completed = latch.await(shutdownTimeout, TimeUnit.MILLISECONDS);
151+
if (!completed) {
152+
LOGGER.warn("Consumer dispatcher for channel didn't shutdown after waiting for {} ms", shutdownTimeout);
160153
}
161-
} catch (Throwable e) {
162-
/*ignored*/
163154
}
155+
} catch (Throwable e) {
156+
/*ignored*/
164157
}
165-
ssWorkService.shutdown();
166158
}
159+
ssWorkService.shutdown();
167160
};
168161
if(this.shutdownExecutor != null) {
169162
shutdownExecutor.execute(target);

0 commit comments

Comments
 (0)