16
16
17
17
package com .rabbitmq .client ;
18
18
19
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
20
+ import static java .util .concurrent .TimeUnit .MINUTES ;
19
21
import static java .util .concurrent .TimeUnit .SECONDS ;
20
22
import static org .junit .jupiter .api .extension .ConditionEvaluationResult .disabled ;
21
23
import static org .junit .jupiter .api .extension .ConditionEvaluationResult .enabled ;
28
30
import io .netty .channel .EventLoopGroup ;
29
31
import io .netty .channel .MultiThreadIoEventLoopGroup ;
30
32
import io .netty .channel .nio .NioIoHandler ;
33
+
34
+ import java .io .PrintWriter ;
35
+ import java .io .StringWriter ;
36
+ import java .lang .management .LockInfo ;
37
+ import java .lang .management .ManagementFactory ;
38
+ import java .lang .management .MonitorInfo ;
39
+ import java .lang .management .RuntimeMXBean ;
40
+ import java .lang .management .ThreadInfo ;
31
41
import java .net .Socket ;
42
+ import java .nio .charset .StandardCharsets ;
43
+ import java .time .Duration ;
44
+ import java .time .LocalDateTime ;
45
+ import java .time .format .DateTimeFormatter ;
46
+ import java .util .List ;
32
47
import java .util .concurrent .ExecutorService ;
33
48
import java .util .concurrent .Executors ;
49
+ import java .util .concurrent .ScheduledExecutorService ;
50
+ import java .util .concurrent .ScheduledFuture ;
34
51
import java .util .concurrent .ThreadFactory ;
35
52
import java .util .concurrent .atomic .AtomicLong ;
53
+ import java .util .concurrent .atomic .AtomicReference ;
54
+ import java .util .stream .Collectors ;
55
+ import java .util .stream .Stream ;
36
56
37
57
import org .junit .jupiter .api .extension .AfterAllCallback ;
38
58
import org .junit .jupiter .api .extension .AfterEachCallback ;
@@ -59,6 +79,8 @@ public class AmqpClientTestExtension
59
79
private static final ExtensionContext .Namespace NAMESPACE =
60
80
ExtensionContext .Namespace .create (AmqpClientTestExtension .class );
61
81
82
+ private static final AtomicReference <String > CURRENT_TEST = new AtomicReference <>();
83
+
62
84
private static ExtensionContext .Store store (ExtensionContext extensionContext ) {
63
85
return extensionContext .getRoot ().getStore (NAMESPACE );
64
86
}
@@ -108,6 +130,16 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
108
130
@ Override
109
131
public void beforeAll (ExtensionContext context ) {
110
132
if (TestUtils .isNetty ()) {
133
+ Duration timeout = Duration .ofMinutes (20 );
134
+ ScheduledFuture <?> task = executor (context ).schedule (() -> {
135
+ try {
136
+ LOGGER .warn ("Test {} has been running for {}" , CURRENT_TEST .get (), timeout );
137
+ logThreadDump ();
138
+ } catch (Exception e ) {
139
+ LOGGER .warn ("Error during test timeout task" , e );
140
+ }
141
+ }, timeout .toMillis (), MILLISECONDS );
142
+ store (context ).put ("threadDumpTask" , task );
111
143
ThreadFactory tf = new NamedThreadFactory (context .getTestClass ().get ().getSimpleName () + "-" );
112
144
EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup (tf , NioIoHandler .newFactory ());
113
145
store (context )
@@ -118,11 +150,10 @@ public void beforeAll(ExtensionContext context) {
118
150
119
151
@ Override
120
152
public void beforeEach (ExtensionContext context ) {
121
- LOGGER .info (
122
- "Starting test: {}.{} (IO layer: {})" ,
123
- context .getTestClass ().get ().getSimpleName (),
124
- context .getTestMethod ().get ().getName (),
125
- TestUtils .IO_LAYER );
153
+ String test = String .format ("%s.%s" , context .getTestClass ().get ().getSimpleName (),
154
+ context .getTestMethod ().get ().getName ());
155
+ CURRENT_TEST .set (test );
156
+ LOGGER .info ("Starting test: {} (IO layer: {})" , test , TestUtils .IO_LAYER );
126
157
}
127
158
128
159
@ Override
@@ -137,34 +168,44 @@ public void afterEach(ExtensionContext context) {
137
168
public void afterAll (ExtensionContext context ) {
138
169
if (TestUtils .isNetty ()) {
139
170
TestUtils .resetEventLoopGroup ();
171
+ ScheduledFuture <?> threadDumpTask = store (context ).get ("threadDumpTask" , ScheduledFuture .class );
172
+ if (threadDumpTask != null ) {
173
+ threadDumpTask .cancel (true );
174
+ }
140
175
EventLoopGroup eventLoopGroup = eventLoopGroup (context );
141
- ExecutorServiceCloseableResourceWrapper wrapper =
142
- context
143
- .getRoot ()
144
- .getStore (ExtensionContext .Namespace .GLOBAL )
145
- .getOrComputeIfAbsent (ExecutorServiceCloseableResourceWrapper .class );
146
-
147
- wrapper
148
- .executorService
149
- .submit (
150
- () -> {
151
- try {
152
- eventLoopGroup .shutdownGracefully (0 , 0 , SECONDS ).get (10 , SECONDS );
153
- } catch (InterruptedException e ) {
154
- Thread .currentThread ().interrupt ();
155
- } catch (Exception e ) {
156
- LOGGER .warn ("Error while asynchronously closing Netty event loop group" , e );
157
- }
158
- });
176
+ ExecutorService executor = executor (context );
177
+
178
+ executor .submit (() -> {
179
+ try {
180
+ eventLoopGroup .shutdownGracefully (0 , 0 , SECONDS ).get (10 , SECONDS );
181
+ } catch (InterruptedException e ) {
182
+ Thread .currentThread ().interrupt ();
183
+ } catch (Exception e ) {
184
+ LOGGER .warn ("Error while asynchronously closing Netty event loop group" , e );
185
+ }
186
+ });
159
187
}
160
188
}
161
189
190
+ private static ScheduledExecutorService executor (ExtensionContext context ) {
191
+ ExecutorServiceCloseableResourceWrapper wrapper =
192
+ context
193
+ .getRoot ()
194
+ .getStore (ExtensionContext .Namespace .GLOBAL )
195
+ .getOrComputeIfAbsent (ExecutorServiceCloseableResourceWrapper .class );
196
+ return wrapper .executor ();
197
+ }
198
+
162
199
private static class ExecutorServiceCloseableResourceWrapper implements AutoCloseable {
163
200
164
- private final ExecutorService executorService ;
201
+ private final ScheduledExecutorService executorService ;
165
202
166
203
private ExecutorServiceCloseableResourceWrapper () {
167
- this .executorService = Executors .newCachedThreadPool ();
204
+ this .executorService = Executors .newScheduledThreadPool (2 );
205
+ }
206
+
207
+ private ScheduledExecutorService executor () {
208
+ return this .executorService ;
168
209
}
169
210
170
211
@ Override
@@ -197,4 +238,109 @@ public Thread newThread(Runnable r) {
197
238
return thread ;
198
239
}
199
240
}
241
+
242
+ private static void logThreadDump () {
243
+ PlainTextThreadDumpFormatter formatter = new PlainTextThreadDumpFormatter ();
244
+ ThreadInfo [] threadInfos =
245
+ ManagementFactory .getThreadMXBean ().dumpAllThreads (true , true );
246
+ String threadDump = formatter .format (threadInfos );
247
+ LOGGER .warn (threadDump );
248
+ }
249
+
250
+ // from Spring Boot's PlainTextThreadDumpFormatter
251
+ private static class PlainTextThreadDumpFormatter {
252
+
253
+ String format (ThreadInfo [] threads ) {
254
+ StringWriter dump = new StringWriter ();
255
+ PrintWriter writer = new PrintWriter (dump );
256
+ writePreamble (writer );
257
+ for (ThreadInfo info : threads ) {
258
+ writeThread (writer , info );
259
+ }
260
+ return dump .toString ();
261
+ }
262
+
263
+ private void writePreamble (PrintWriter writer ) {
264
+ DateTimeFormatter dateFormat = DateTimeFormatter .ofPattern ("yyyy-MM-dd HH:mm:ss" );
265
+ writer .println (dateFormat .format (LocalDateTime .now ()));
266
+ RuntimeMXBean runtime = ManagementFactory .getRuntimeMXBean ();
267
+ writer .printf (
268
+ "Full thread dump %s (%s %s):%n" ,
269
+ runtime .getVmName (), runtime .getVmVersion (), System .getProperty ("java.vm.info" ));
270
+ writer .println ();
271
+ }
272
+
273
+ private void writeThread (PrintWriter writer , ThreadInfo info ) {
274
+ writer .printf ("\" %s\" - Thread t@%d%n" , info .getThreadName (), info .getThreadId ());
275
+ writer .printf (" %s: %s%n" , Thread .State .class .getCanonicalName (), info .getThreadState ());
276
+ writeStackTrace (writer , info , info .getLockedMonitors ());
277
+ writer .println ();
278
+ writeLockedOwnableSynchronizers (writer , info );
279
+ writer .println ();
280
+ }
281
+
282
+ private void writeStackTrace (
283
+ PrintWriter writer , ThreadInfo info , MonitorInfo [] lockedMonitors ) {
284
+ int depth = 0 ;
285
+ for (StackTraceElement element : info .getStackTrace ()) {
286
+ writeStackTraceElement (
287
+ writer , element , info , lockedMonitorsForDepth (lockedMonitors , depth ), depth == 0 );
288
+ depth ++;
289
+ }
290
+ }
291
+
292
+ private List <MonitorInfo > lockedMonitorsForDepth (MonitorInfo [] lockedMonitors , int depth ) {
293
+ return Stream .of (lockedMonitors )
294
+ .filter ((lockedMonitor ) -> lockedMonitor .getLockedStackDepth () == depth )
295
+ .collect (Collectors .toList ());
296
+ }
297
+
298
+ private void writeStackTraceElement (
299
+ PrintWriter writer ,
300
+ StackTraceElement element ,
301
+ ThreadInfo info ,
302
+ List <MonitorInfo > lockedMonitors ,
303
+ boolean firstElement ) {
304
+ writer .printf ("\t at %s%n" , element .toString ());
305
+ LockInfo lockInfo = info .getLockInfo ();
306
+ if (firstElement && lockInfo != null ) {
307
+ if (element .getClassName ().equals (Object .class .getName ())
308
+ && element .getMethodName ().equals ("wait" )) {
309
+ writer .printf ("\t - waiting on %s%n" , format (lockInfo ));
310
+ } else {
311
+ String lockOwner = info .getLockOwnerName ();
312
+ if (lockOwner != null ) {
313
+ writer .printf (
314
+ "\t - waiting to lock %s owned by \" %s\" t@%d%n" ,
315
+ format (lockInfo ), lockOwner , info .getLockOwnerId ());
316
+ } else {
317
+ writer .printf ("\t - parking to wait for %s%n" , format (lockInfo ));
318
+ }
319
+ }
320
+ }
321
+ writeMonitors (writer , lockedMonitors );
322
+ }
323
+
324
+ private String format (LockInfo lockInfo ) {
325
+ return String .format ("<%x> (a %s)" , lockInfo .getIdentityHashCode (), lockInfo .getClassName ());
326
+ }
327
+
328
+ private void writeMonitors (PrintWriter writer , List <MonitorInfo > lockedMonitorsAtCurrentDepth ) {
329
+ for (MonitorInfo lockedMonitor : lockedMonitorsAtCurrentDepth ) {
330
+ writer .printf ("\t - locked %s%n" , format (lockedMonitor ));
331
+ }
332
+ }
333
+
334
+ private void writeLockedOwnableSynchronizers (PrintWriter writer , ThreadInfo info ) {
335
+ writer .println (" Locked ownable synchronizers:" );
336
+ LockInfo [] lockedSynchronizers = info .getLockedSynchronizers ();
337
+ if (lockedSynchronizers == null || lockedSynchronizers .length == 0 ) {
338
+ writer .println ("\t - None" );
339
+ } else {
340
+ for (LockInfo lockedSynchronizer : lockedSynchronizers ) {
341
+ writer .printf ("\t - Locked %s%n" , format (lockedSynchronizer ));
342
+ }
343
+ }
344
+ }
345
+ }
200
346
}
0 commit comments