1
1
import json
2
+ import time
2
3
from datetime import datetime
3
4
from time import sleep
4
5
from typing import Optional
5
6
6
7
from django .conf import settings
7
8
from django .core .serializers .json import DjangoJSONEncoder
8
9
10
+ import structlog
9
11
from rest_framework .exceptions import APIException
10
12
11
13
from posthog .schema import (
26
28
27
29
from posthog .api .services .query import process_query_dict
28
30
from posthog .clickhouse .client .execute_async import get_query_status
29
- from posthog .clickhouse .query_tagging import Product , tags_context
31
+ from posthog .clickhouse .query_tagging import Product , tag_queries , tags_context
30
32
from posthog .errors import ExposedCHQueryError
31
33
from posthog .hogql_queries .query_runner import ExecutionMode
32
34
from posthog .models .team .team import Team
39
41
TrendsResultsFormatter ,
40
42
)
41
43
44
+ logger = structlog .get_logger (__name__ )
45
+
46
+ TIMING_LOG_PREFIX = "[QUERY_EXECUTOR]"
47
+
42
48
SupportedQueryTypes = (
43
49
AssistantTrendsQuery
44
50
| TrendsQuery
@@ -74,7 +80,11 @@ def __init__(self, team: Team, utc_now_datetime: datetime):
74
80
self ._utc_now_datetime = utc_now_datetime
75
81
76
82
def run_and_format_query (
77
- self , query : SupportedQueryTypes , execution_mode : Optional [ExecutionMode ] = None
83
+ self ,
84
+ query : SupportedQueryTypes ,
85
+ execution_mode : Optional [ExecutionMode ] = None ,
86
+ insight_id = None ,
87
+ debug_timing = False ,
78
88
) -> tuple [str , bool ]:
79
89
"""
80
90
Run a query and format the results with detailed fallback information.
@@ -93,24 +103,58 @@ def run_and_format_query(
93
103
Raises:
94
104
Exception: If query execution fails with descriptive error messages
95
105
"""
96
- with tags_context (product = Product .MAX_AI , team_id = self ._team .pk , org_id = self ._team .organization_id ):
97
- response_dict = self .execute_query (query , execution_mode )
106
+ start_time = time .time ()
107
+ query_type = type (query ).__name__
108
+ if debug_timing :
109
+ logger .warning (f"{ TIMING_LOG_PREFIX } Starting run_and_format_query for { query_type } " )
98
110
99
111
try :
100
- # Attempt to format results using query-specific formatters
101
- formatted_results = self ._compress_results (query , response_dict )
102
- return formatted_results , False # No fallback used
103
- except Exception as err :
104
- if isinstance (err , NotImplementedError ):
105
- # Re-raise NotImplementedError for unsupported query types
106
- raise
107
- # Fallback to raw JSON if formatting fails - ensures robustness
108
- fallback_results = json .dumps (response_dict ["results" ], cls = DjangoJSONEncoder , separators = ("," , ":" ))
109
- return fallback_results , True # Fallback was used
112
+ with tags_context (product = Product .MAX_AI , team_id = self ._team .pk , org_id = self ._team .organization_id ):
113
+ if insight_id :
114
+ # Including insight ID for insight search
115
+ tag_queries (insight_id = insight_id )
116
+ execute_start = time .time ()
117
+ response_dict = self .execute_query (query , execution_mode , debug_timing = debug_timing )
118
+ execute_elapsed = time .time () - execute_start
119
+ if debug_timing :
120
+ logger .warning (f"{ TIMING_LOG_PREFIX } execute_query completed in { execute_elapsed :.3f} s" )
121
+
122
+ try :
123
+ # Attempt to format results using query-specific formatters
124
+ format_start = time .time ()
125
+ formatted_results = self ._compress_results (query , response_dict , debug_timing = debug_timing )
126
+ format_elapsed = time .time () - format_start
127
+ total_elapsed = time .time () - start_time
128
+ if debug_timing :
129
+ logger .warning (
130
+ f"{ TIMING_LOG_PREFIX } _compress_results completed in { format_elapsed :.3f} s, "
131
+ f"total run_and_format_query: { total_elapsed :.3f} s"
132
+ )
133
+ return formatted_results , False # No fallback used
134
+ except Exception as err :
135
+ if isinstance (err , NotImplementedError ):
136
+ # Re-raise NotImplementedError for unsupported query types
137
+ raise
138
+ # Fallback to raw JSON if formatting fails - ensures robustness
139
+ fallback_start = time .time ()
140
+ fallback_results = json .dumps (response_dict ["results" ], cls = DjangoJSONEncoder , separators = ("," , ":" ))
141
+ fallback_elapsed = time .time () - fallback_start
142
+ total_elapsed = time .time () - start_time
143
+ if debug_timing :
144
+ logger .warning (
145
+ f"{ TIMING_LOG_PREFIX } Fallback JSON formatting completed in { fallback_elapsed :.3f} s, "
146
+ f"total with fallback: { total_elapsed :.3f} s"
147
+ )
148
+ return fallback_results , True # Fallback was used
149
+ except Exception :
150
+ elapsed = time .time () - start_time
151
+ if debug_timing :
152
+ logger .exception (f"{ TIMING_LOG_PREFIX } run_and_format_query failed after { elapsed :.3f} s" )
153
+ raise
110
154
111
155
@database_sync_to_async (thread_sensitive = False )
112
156
def arun_and_format_query (
113
- self , query : SupportedQueryTypes , execution_mode : Optional [ExecutionMode ] = None
157
+ self , query : SupportedQueryTypes , execution_mode : Optional [ExecutionMode ] = None , debug_timing = False
114
158
) -> tuple [str , bool ]:
115
159
"""
116
160
Run a query and format the results with detailed fallback information.
@@ -129,9 +173,33 @@ def arun_and_format_query(
129
173
Raises:
130
174
Exception: If query execution fails with descriptive error messages
131
175
"""
132
- return self .run_and_format_query (query , execution_mode )
176
+ start_time = time .time ()
177
+ query_type = type (query ).__name__
178
+ if debug_timing :
179
+ logger .warning (f"{ TIMING_LOG_PREFIX } [ASYNC->SYNC] Starting arun_and_format_query for { query_type } " )
133
180
134
- def execute_query (self , query : SupportedQueryTypes , execution_mode : Optional [ExecutionMode ] = None ) -> dict :
181
+ try :
182
+ sync_start = time .time ()
183
+ result = self .run_and_format_query (query , execution_mode , debug_timing = debug_timing )
184
+ sync_elapsed = time .time () - sync_start
185
+ total_elapsed = time .time () - start_time
186
+
187
+ if debug_timing :
188
+ logger .warning (
189
+ f"{ TIMING_LOG_PREFIX } [ASYNC->SYNC] Sync execution took { sync_elapsed :.3f} s, "
190
+ f"async wrapper overhead: { (total_elapsed - sync_elapsed ) * 1000 :.1f} ms, "
191
+ f"total: { total_elapsed :.3f} s"
192
+ )
193
+ return result
194
+ except Exception :
195
+ elapsed = time .time () - start_time
196
+ if debug_timing :
197
+ logger .exception (f"{ TIMING_LOG_PREFIX } [ASYNC->SYNC] arun_and_format_query failed after { elapsed :.3f} s" )
198
+ raise
199
+
200
+ def execute_query (
201
+ self , query : SupportedQueryTypes , execution_mode : Optional [ExecutionMode ] = None , debug_timing = False
202
+ ) -> dict :
135
203
"""
136
204
Execute a query and return the response dict.
137
205
@@ -145,6 +213,9 @@ def execute_query(self, query: SupportedQueryTypes, execution_mode: Optional[Exe
145
213
Raises:
146
214
Exception: If query execution fails
147
215
"""
216
+ start_time = time .time ()
217
+ query_type = type (query ).__name__
218
+
148
219
# Set appropriate execution mode based on environment
149
220
if execution_mode is None :
150
221
execution_mode = (
@@ -153,14 +224,27 @@ def execute_query(self, query: SupportedQueryTypes, execution_mode: Optional[Exe
153
224
else ExecutionMode .CALCULATE_BLOCKING_ALWAYS
154
225
)
155
226
227
+ if debug_timing :
228
+ logger .warning (
229
+ f"{ TIMING_LOG_PREFIX } Starting execute_query for { query_type } with mode { execution_mode .value } "
230
+ )
231
+
156
232
try :
157
233
# Execute the query using PostHog's query processing system
234
+ process_start = time .time ()
235
+ if debug_timing :
236
+ logger .warning (f"{ TIMING_LOG_PREFIX } Calling process_query_dict" )
237
+
158
238
results_response = process_query_dict (
159
239
self ._team ,
160
240
query .model_dump (mode = "json" ),
161
241
execution_mode = execution_mode ,
162
242
)
163
243
244
+ process_elapsed = time .time () - process_start
245
+ if debug_timing :
246
+ logger .warning (f"{ TIMING_LOG_PREFIX } process_query_dict completed in { process_elapsed :.3f} s" )
247
+
164
248
# Normalize response to dict format for consistent handling
165
249
if isinstance (results_response , dict ):
166
250
response_dict = results_response
@@ -170,17 +254,52 @@ def execute_query(self, query: SupportedQueryTypes, execution_mode: Optional[Exe
170
254
# Handle async queries that may need polling
171
255
if query_status := response_dict .get ("query_status" ):
172
256
if not query_status ["complete" ]:
257
+ polling_start = time .time ()
258
+ poll_count = 0
259
+ total_wait_ms = 0
260
+
261
+ if debug_timing :
262
+ logger .warning (
263
+ f"{ TIMING_LOG_PREFIX } Query returned incomplete, starting async polling (query_id={ query_status ['id' ]} )"
264
+ )
265
+
173
266
# Poll async query until completion with exponential backoff
174
267
# Total wait time: ~726 seconds with 100ms increments
175
268
for wait_ms in range (100 , 12000 , 100 ):
269
+ poll_count += 1
270
+ total_wait_ms += wait_ms
271
+
272
+ if poll_count % 10 == 0 and debug_timing : # Log every 10 polls
273
+ logger .warning (
274
+ f"{ TIMING_LOG_PREFIX } Polling attempt { poll_count } , total wait: { total_wait_ms / 1000 :.1f} s"
275
+ )
276
+
176
277
sleep (wait_ms / 1000 )
278
+
279
+ status_check_start = time .time ()
177
280
query_status = get_query_status (team_id = self ._team .pk , query_id = query_status ["id" ]).model_dump (
178
281
mode = "json"
179
282
)
283
+ status_check_elapsed = time .time () - status_check_start
284
+
285
+ if status_check_elapsed > 0.5 and debug_timing : # Log slow status checks
286
+ logger .warning (f"{ TIMING_LOG_PREFIX } Slow status check: { status_check_elapsed :.3f} s" )
287
+
180
288
if query_status ["complete" ]:
289
+ polling_elapsed = time .time () - polling_start
290
+ if debug_timing :
291
+ logger .warning (
292
+ f"{ TIMING_LOG_PREFIX } Async query completed after { poll_count } polls, "
293
+ f"total polling time: { polling_elapsed :.3f} s"
294
+ )
181
295
break
182
296
else :
183
297
# Query timed out after maximum wait time
298
+ polling_elapsed = time .time () - polling_start
299
+ if debug_timing :
300
+ logger .error (
301
+ f"{ TIMING_LOG_PREFIX } Query timeout after { poll_count } polls, { polling_elapsed :.3f} s"
302
+ )
184
303
raise APIException (
185
304
"Query hasn't completed in time. It's worth trying again, maybe with a shorter time range."
186
305
)
@@ -195,21 +314,30 @@ def execute_query(self, query: SupportedQueryTypes, execution_mode: Optional[Exe
195
314
response_dict = query_status ["results" ]
196
315
197
316
except (APIException , ExposedHogQLError , HogQLNotImplementedError , ExposedCHQueryError ) as err :
317
+ elapsed = time .time () - start_time
198
318
# Handle known query execution errors with user-friendly messages
199
319
err_message = str (err )
200
320
if isinstance (err , APIException ):
201
321
if isinstance (err .detail , dict ):
202
322
err_message = ", " .join (f"{ key } : { value } " for key , value in err .detail .items ())
203
323
elif isinstance (err .detail , list ):
204
324
err_message = ", " .join (map (str , err .detail ))
325
+ if debug_timing :
326
+ logger .exception (f"{ TIMING_LOG_PREFIX } Query execution failed after { elapsed :.3f} s: { err_message } " )
205
327
raise Exception (f"There was an error running this query: { err_message } " )
206
328
except Exception :
329
+ elapsed = time .time () - start_time
207
330
# Catch-all for unexpected errors during query execution
331
+ if debug_timing :
332
+ logger .exception (f"{ TIMING_LOG_PREFIX } Unknown error during query execution after { elapsed :.3f} s" )
208
333
raise Exception ("There was an unknown error running this query." )
209
334
335
+ total_elapsed = time .time () - start_time
336
+ if debug_timing :
337
+ logger .warning (f"{ TIMING_LOG_PREFIX } execute_query completed successfully in { total_elapsed :.3f} s" )
210
338
return response_dict
211
339
212
- def _compress_results (self , query : SupportedQueryTypes , response : dict ) -> str :
340
+ def _compress_results (self , query : SupportedQueryTypes , response : dict , debug_timing = False ) -> str :
213
341
"""
214
342
Format query results using appropriate formatter based on query type.
215
343
@@ -223,12 +351,35 @@ def _compress_results(self, query: SupportedQueryTypes, response: dict) -> str:
223
351
Raises:
224
352
NotImplementedError: If the query type is not supported
225
353
"""
226
- # Handle assistant-specific query types with direct formatting
227
- if isinstance (query , AssistantTrendsQuery | TrendsQuery ):
228
- return TrendsResultsFormatter (query , response ["results" ]).format ()
229
- elif isinstance (query , AssistantFunnelsQuery | FunnelsQuery ):
230
- return FunnelResultsFormatter (query , response ["results" ], self ._team , self ._utc_now_datetime ).format ()
231
- elif isinstance (query , AssistantRetentionQuery | RetentionQuery ):
232
- return RetentionResultsFormatter (query , response ["results" ]).format ()
233
- elif isinstance (query , AssistantHogQLQuery | HogQLQuery ):
234
- return SQLResultsFormatter (query , response ["results" ], response ["columns" ]).format ()
354
+ start_time = time .time ()
355
+ query_type = type (query ).__name__
356
+ formatter_name = None
357
+
358
+ try :
359
+ # Handle assistant-specific query types with direct formatting
360
+ if isinstance (query , AssistantTrendsQuery | TrendsQuery ):
361
+ formatter_name = "TrendsResultsFormatter"
362
+ result = TrendsResultsFormatter (query , response ["results" ]).format ()
363
+ elif isinstance (query , AssistantFunnelsQuery | FunnelsQuery ):
364
+ formatter_name = "FunnelResultsFormatter"
365
+ result = FunnelResultsFormatter (query , response ["results" ], self ._team , self ._utc_now_datetime ).format ()
366
+ elif isinstance (query , AssistantRetentionQuery | RetentionQuery ):
367
+ formatter_name = "RetentionResultsFormatter"
368
+ result = RetentionResultsFormatter (query , response ["results" ]).format ()
369
+ elif isinstance (query , AssistantHogQLQuery | HogQLQuery ):
370
+ formatter_name = "SQLResultsFormatter"
371
+ result = SQLResultsFormatter (query , response ["results" ], response ["columns" ]).format ()
372
+ else :
373
+ raise NotImplementedError (f"Unsupported query type: { query_type } " )
374
+
375
+ elapsed = time .time () - start_time
376
+ if debug_timing :
377
+ logger .warning (
378
+ f"{ TIMING_LOG_PREFIX } { formatter_name } .format() completed in { elapsed :.3f} s for { query_type } "
379
+ )
380
+ return result
381
+ except Exception :
382
+ elapsed = time .time () - start_time
383
+ if debug_timing :
384
+ logger .exception (f"{ TIMING_LOG_PREFIX } _compress_results failed after { elapsed :.3f} s for { query_type } " )
385
+ raise
0 commit comments