zhiminy commited on
Commit
3d31827
Β·
1 Parent(s): cafc0c6

refine mining mechanism

Browse files
Files changed (3) hide show
  1. app.py +360 -57
  2. msr.py +437 -745
  3. requirements.txt +2 -0
app.py CHANGED
@@ -3,6 +3,7 @@ from gradio_leaderboard import Leaderboard
3
  import json
4
  import os
5
  import time
 
6
  import requests
7
  from datetime import datetime, timezone, timedelta
8
  from collections import defaultdict
@@ -17,6 +18,7 @@ import plotly.graph_objects as go
17
  from plotly.subplots import make_subplots
18
  from apscheduler.schedulers.background import BackgroundScheduler
19
  from apscheduler.triggers.cron import CronTrigger
 
20
 
21
  # Load environment variables
22
  load_dotenv()
@@ -121,6 +123,284 @@ def normalize_date_format(date_string):
121
  return date_string
122
 
123
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
  # =============================================================================
125
  # GITHUB API OPERATIONS
126
  # =============================================================================
@@ -1788,19 +2068,24 @@ def create_monthly_metrics_plot():
1788
  # Create figure with secondary y-axis
1789
  fig = make_subplots(specs=[[{"secondary_y": True}]])
1790
 
1791
- # Define colors for agents (using a color palette)
1792
- colors = [
1793
- '#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd',
1794
- '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'
1795
- ]
 
 
1796
 
1797
  agents = metrics['agents']
1798
  months = metrics['months']
1799
  data = metrics['data']
1800
 
 
 
 
1801
  # Add traces for each agent
1802
  for idx, agent_name in enumerate(agents):
1803
- color = colors[idx % len(colors)]
1804
  agent_data = data[agent_name]
1805
 
1806
  # Add line trace for acceptance rate (left y-axis)
@@ -1817,10 +2102,11 @@ def create_monthly_metrics_plot():
1817
  name=agent_name,
1818
  mode='lines+markers',
1819
  line=dict(color=color, width=2),
1820
- marker=dict(size=6),
1821
  legendgroup=agent_name,
1822
- showlegend=True,
1823
- hovertemplate='<b>%{fullData.name}</b><br>' +
 
1824
  'Acceptance Rate: %{y:.2f}%<br>' +
1825
  '<extra></extra>'
1826
  ),
@@ -1841,11 +2127,12 @@ def create_monthly_metrics_plot():
1841
  go.Bar(
1842
  x=x_bars,
1843
  y=y_bars,
1844
- name=f"{agent_name} (Reviews)",
1845
  marker=dict(color=color, opacity=0.6),
1846
  legendgroup=agent_name,
1847
- showlegend=False, # Don't show in legend (already shown for line)
1848
- hovertemplate='<b>%{fullData.name}</b><br>' +
 
1849
  'Total Reviews: %{y}<br>' +
1850
  '<extra></extra>',
1851
  offsetgroup=agent_name # Group bars by agent for proper spacing
@@ -1861,17 +2148,11 @@ def create_monthly_metrics_plot():
1861
  # Update layout
1862
  fig.update_layout(
1863
  title=None,
1864
- hovermode='closest',
1865
  barmode='group',
1866
  height=600,
1867
- legend=dict(
1868
- orientation="h",
1869
- yanchor="bottom",
1870
- y=1.02,
1871
- xanchor="right",
1872
- x=1
1873
- ),
1874
- margin=dict(l=50, r=50, t=100, b=50)
1875
  )
1876
 
1877
  return fig
@@ -1978,17 +2259,21 @@ def submit_agent(identifier, agent_name, organization, description, website):
1978
 
1979
  def fetch_and_update_daily_reviews():
1980
  """
1981
- Fetch and update reviews with comprehensive status checking.
1982
 
1983
  Strategy:
1984
  1. For each agent:
1985
  - Examine ALL open reviews from last LEADERBOARD_TIME_FRAME_DAYS - 1 for their closed_at status
1986
- - Update PR status for all existing metadata (last LEADERBOARD_TIME_FRAME_DAYS - 1)
1987
- - Fetch new reviews from yesterday 12am to today 12am
1988
  - Save all updated/new metadata back to HuggingFace
1989
  """
1990
- tokens = get_github_tokens()
1991
- token_pool = TokenPool(tokens)
 
 
 
 
1992
 
1993
  # Load all agents
1994
  agents = load_agents_from_hf()
@@ -2041,44 +2326,62 @@ def fetch_and_update_daily_reviews():
2041
 
2042
  print(f" βœ“ Loaded {len(recent_metadata)} existing reviews from timeframe")
2043
 
2044
- # Step 2: Examine ALL open reviews for their closed_at status
2045
- # This ensures we capture any reviews that may have been closed/merged since last check
2046
  if recent_metadata:
2047
- print(f"πŸ” Examining {len(recent_metadata)} open reviews for status updates (checking closed_at)...")
2048
- recent_metadata = update_pr_status(recent_metadata, token_pool)
2049
- print(f" βœ“ Updated PR status for existing reviews")
2050
-
2051
- # Step 3: Fetch NEW reviews from yesterday 12am to today 12am
2052
- print(f"πŸ” Fetching new reviews from {yesterday_midnight.isoformat()} to {today_midnight.isoformat()}...")
2053
-
2054
- base_query = f'is:pr review:approved author:{identifier} -is:draft'
2055
- prs_by_url = {}
2056
-
2057
- fetch_reviews_with_time_partition(
2058
- base_query,
2059
- yesterday_midnight,
2060
- today_midnight,
2061
- token_pool,
2062
- prs_by_url,
2063
- debug_limit=None
2064
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
2065
 
2066
  # Extract metadata for new reviews
2067
  yesterday_metadata = []
2068
- for pr_url, pr in prs_by_url.items():
2069
- metadata = extract_review_metadata(pr)
2070
- if metadata:
2071
- metadata['agent_identifier'] = identifier
2072
- yesterday_metadata.append(metadata)
 
 
 
 
 
 
 
2073
 
2074
- print(f" βœ“ Found {len(yesterday_metadata)} new reviews in 24-hour window")
 
 
2075
 
2076
- # Step 4: Update PR status for new reviews
2077
- if yesterday_metadata:
2078
- print(f" Updating PR status for {len(yesterday_metadata)} new reviews...")
2079
- yesterday_metadata = update_pr_status(yesterday_metadata, token_pool)
2080
 
2081
- # Step 5: Combine and save all metadata
2082
  all_updated_metadata = recent_metadata + yesterday_metadata
2083
 
2084
  if all_updated_metadata:
 
3
  import json
4
  import os
5
  import time
6
+ import tempfile
7
  import requests
8
  from datetime import datetime, timezone, timedelta
9
  from collections import defaultdict
 
18
  from plotly.subplots import make_subplots
19
  from apscheduler.schedulers.background import BackgroundScheduler
20
  from apscheduler.triggers.cron import CronTrigger
21
+ from google.cloud import bigquery
22
 
23
  # Load environment variables
24
  load_dotenv()
 
123
  return date_string
124
 
125
 
126
+ # =============================================================================
127
+ # BIGQUERY FUNCTIONS
128
+ # =============================================================================
129
+
130
+ def get_bigquery_client():
131
+ """
132
+ Initialize BigQuery client using credentials from environment variable.
133
+
134
+ Expects GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable containing
135
+ the service account JSON credentials as a string.
136
+ """
137
+ # Get the JSON content from environment variable
138
+ creds_json = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS_JSON')
139
+
140
+ if creds_json:
141
+ # Create a temporary file to store credentials
142
+ with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
143
+ temp_file.write(creds_json)
144
+ temp_path = temp_file.name
145
+
146
+ # Set environment variable to point to temp file
147
+ os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = temp_path
148
+
149
+ # Initialize BigQuery client
150
+ client = bigquery.Client()
151
+
152
+ # Clean up temp file
153
+ os.unlink(temp_path)
154
+
155
+ return client
156
+ else:
157
+ raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment")
158
+
159
+
160
+ def fetch_reviews_from_bigquery(client, identifier, start_date, end_date):
161
+ """
162
+ Fetch PR review events from GitHub Archive for a specific agent.
163
+
164
+ Queries githubarchive.day.YYYYMMDD tables for PullRequestReviewEvent where
165
+ actor.login matches the agent identifier.
166
+
167
+ Args:
168
+ client: BigQuery client instance
169
+ identifier: GitHub username or bot identifier (e.g., 'amazon-inspector-beta[bot]')
170
+ start_date: Start datetime (timezone-aware)
171
+ end_date: End datetime (timezone-aware)
172
+
173
+ Returns:
174
+ List of review event rows with PR information
175
+ """
176
+ print(f"\nπŸ” Querying BigQuery for reviews by {identifier}")
177
+ print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
178
+
179
+ # Generate list of table names for each day in the range
180
+ table_refs = []
181
+ current_date = start_date
182
+ while current_date < end_date:
183
+ table_name = f"githubarchive.day.{current_date.strftime('%Y%m%d')}"
184
+ table_refs.append(table_name)
185
+ current_date += timedelta(days=1)
186
+
187
+ # Build UNION ALL query for all daily tables
188
+ union_parts = []
189
+ for table_name in table_refs:
190
+ union_parts.append(f"""
191
+ SELECT
192
+ repo.name as repo_name,
193
+ actor.login as actor_login,
194
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.html_url') as pr_url,
195
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.number') AS INT64) as pr_number,
196
+ JSON_EXTRACT_SCALAR(payload, '$.review.submitted_at') as reviewed_at,
197
+ created_at
198
+ FROM `{table_name}`
199
+ WHERE type = 'PullRequestReviewEvent'
200
+ AND actor.login = @identifier
201
+ """)
202
+
203
+ query = " UNION ALL ".join(union_parts)
204
+
205
+ job_config = bigquery.QueryJobConfig(
206
+ query_parameters=[
207
+ bigquery.ScalarQueryParameter("identifier", "STRING", identifier)
208
+ ]
209
+ )
210
+
211
+ print(f" Querying {len(table_refs)} daily tables...")
212
+
213
+ try:
214
+ query_job = client.query(query, job_config=job_config)
215
+ results = list(query_job.result())
216
+
217
+ print(f" βœ“ Found {len(results)} review events")
218
+ return results
219
+
220
+ except Exception as e:
221
+ print(f" βœ— BigQuery error: {str(e)}")
222
+ return []
223
+
224
+
225
+ def fetch_pr_status_from_bigquery(client, pr_urls, start_date, end_date):
226
+ """
227
+ Fetch PR status (merged/closed) from GitHub Archive PullRequestEvent.
228
+
229
+ For each PR URL, looks for PullRequestEvent with action='closed' to determine
230
+ if the PR was merged or just closed.
231
+
232
+ Args:
233
+ client: BigQuery client instance
234
+ pr_urls: List of PR URLs to check status for
235
+ start_date: Start datetime (should cover review period and after)
236
+ end_date: End datetime (should be recent/current)
237
+
238
+ Returns:
239
+ Dictionary mapping PR URL to status dict:
240
+ {
241
+ 'pr_url': {
242
+ 'status': 'merged'|'closed'|'open',
243
+ 'merged': bool,
244
+ 'closed_at': timestamp or None
245
+ }
246
+ }
247
+ """
248
+ if not pr_urls:
249
+ return {}
250
+
251
+ print(f"\nπŸ” Querying BigQuery for PR status ({len(pr_urls)} PRs)...")
252
+
253
+ # Extract repo and PR number from URLs
254
+ # URL format: https://github.com/owner/repo/pull/123
255
+ pr_info = []
256
+ for url in pr_urls:
257
+ try:
258
+ parts = url.replace('https://github.com/', '').split('/')
259
+ if len(parts) >= 4:
260
+ owner = parts[0]
261
+ repo = parts[1]
262
+ pr_number = int(parts[3])
263
+ repo_name = f"{owner}/{repo}"
264
+ pr_info.append({
265
+ 'url': url,
266
+ 'repo': repo_name,
267
+ 'number': pr_number
268
+ })
269
+ except Exception as e:
270
+ print(f" Warning: Could not parse PR URL {url}: {e}")
271
+ continue
272
+
273
+ if not pr_info:
274
+ return {}
275
+
276
+ # Build repo filter condition for WHERE clause
277
+ # Group PRs by repo to create efficient filters
278
+ repos_to_prs = defaultdict(list)
279
+ for pr in pr_info:
280
+ repos_to_prs[pr['repo']].append(pr['number'])
281
+
282
+ # Generate list of table names for date range
283
+ # Look back 1 full year from end_date to catch PR close events that may have occurred before reviews
284
+ pr_status_start = end_date - timedelta(days=365)
285
+ table_refs = []
286
+ current_date = pr_status_start
287
+ while current_date < end_date:
288
+ table_name = f"githubarchive.day.{current_date.strftime('%Y%m%d')}"
289
+ table_refs.append(table_name)
290
+ current_date += timedelta(days=1)
291
+
292
+ # Build WHERE clause to filter by specific repos and PR numbers
293
+ # Format: (repo='owner/repo1' AND pr_number IN (1,2,3)) OR (repo='owner/repo2' AND pr_number IN (4,5))
294
+ filter_conditions = []
295
+ for repo, pr_numbers in repos_to_prs.items():
296
+ pr_list = ','.join(map(str, pr_numbers))
297
+ filter_conditions.append(f"(repo.name = '{repo}' AND CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.number') AS INT64) IN ({pr_list}))")
298
+
299
+ pr_filter = " OR ".join(filter_conditions)
300
+
301
+ # Build query to find close/merge events for specific PRs
302
+ union_parts = []
303
+ for table_name in table_refs:
304
+ union_parts.append(f"""
305
+ SELECT
306
+ repo.name as repo_name,
307
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.number') AS INT64) as pr_number,
308
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.html_url') as pr_url,
309
+ JSON_EXTRACT_SCALAR(payload, '$.action') as action,
310
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.merged') AS BOOL) as merged,
311
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.closed_at') as closed_at,
312
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.merged_at') as merged_at,
313
+ created_at
314
+ FROM `{table_name}`
315
+ WHERE type = 'PullRequestEvent'
316
+ AND JSON_EXTRACT_SCALAR(payload, '$.action') = 'closed'
317
+ AND ({pr_filter})
318
+ """)
319
+
320
+ query = " UNION ALL ".join(union_parts)
321
+
322
+ print(f" Querying {len(table_refs)} daily tables for PR status (1-year lookback: {pr_status_start.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')})...")
323
+ print(f" Filtering for {len(pr_info)} specific PRs across {len(repos_to_prs)} repos")
324
+
325
+ try:
326
+ query_job = client.query(query)
327
+ results = list(query_job.result())
328
+
329
+ print(f" βœ“ Found {len(results)} PR close events")
330
+
331
+ # Build status map by PR URL
332
+ status_map = {}
333
+ for row in results:
334
+ pr_url = row.pr_url
335
+
336
+ merged = row.merged if row.merged is not None else False
337
+ closed_at = row.closed_at or row.merged_at
338
+
339
+ # Convert to ISO format if datetime
340
+ if hasattr(closed_at, 'isoformat'):
341
+ closed_at = closed_at.isoformat()
342
+
343
+ status = 'merged' if merged else 'closed'
344
+
345
+ status_map[pr_url] = {
346
+ 'status': status,
347
+ 'merged': merged,
348
+ 'closed_at': closed_at
349
+ }
350
+
351
+ # Mark remaining PRs as open
352
+ for url in pr_urls:
353
+ if url not in status_map:
354
+ status_map[url] = {
355
+ 'status': 'open',
356
+ 'merged': False,
357
+ 'closed_at': None
358
+ }
359
+
360
+ merged_count = sum(1 for s in status_map.values() if s['merged'])
361
+ closed_count = sum(1 for s in status_map.values() if s['status'] == 'closed')
362
+ open_count = sum(1 for s in status_map.values() if s['status'] == 'open')
363
+
364
+ print(f" Status breakdown: {merged_count} merged, {closed_count} closed, {open_count} open")
365
+
366
+ return status_map
367
+
368
+ except Exception as e:
369
+ print(f" βœ— BigQuery error: {str(e)}")
370
+ # Return all as open on error
371
+ return {url: {'status': 'open', 'merged': False, 'closed_at': None} for url in pr_urls}
372
+
373
+
374
+ def extract_review_metadata_from_bigquery(review_row, status_info):
375
+ """
376
+ Extract minimal PR review metadata from BigQuery row and status info.
377
+
378
+ Args:
379
+ review_row: BigQuery row from PullRequestReviewEvent query
380
+ status_info: Status dictionary from fetch_pr_status_from_bigquery
381
+
382
+ Returns:
383
+ Dictionary with review metadata
384
+ """
385
+ pr_url = review_row.pr_url
386
+ pr_number = review_row.pr_number
387
+ reviewed_at = review_row.reviewed_at or review_row.created_at
388
+
389
+ # Convert to ISO format if datetime
390
+ if hasattr(reviewed_at, 'isoformat'):
391
+ reviewed_at = reviewed_at.isoformat()
392
+
393
+ return {
394
+ 'html_url': pr_url,
395
+ 'reviewed_at': reviewed_at,
396
+ 'pr_status': status_info['status'],
397
+ 'pr_merged': status_info['merged'],
398
+ 'pr_closed_at': status_info['closed_at'],
399
+ 'pr_url': pr_url,
400
+ 'review_id': f"pr_{pr_number}"
401
+ }
402
+
403
+
404
  # =============================================================================
405
  # GITHUB API OPERATIONS
406
  # =============================================================================
 
2068
  # Create figure with secondary y-axis
2069
  fig = make_subplots(specs=[[{"secondary_y": True}]])
2070
 
2071
+ # Generate unique colors for many agents using HSL color space
2072
+ def generate_color(index, total):
2073
+ """Generate distinct colors using HSL color space for better distribution"""
2074
+ hue = (index * 360 / total) % 360
2075
+ saturation = 70 + (index % 3) * 10 # Vary saturation slightly
2076
+ lightness = 45 + (index % 2) * 10 # Vary lightness slightly
2077
+ return f'hsl({hue}, {saturation}%, {lightness}%)'
2078
 
2079
  agents = metrics['agents']
2080
  months = metrics['months']
2081
  data = metrics['data']
2082
 
2083
+ # Generate colors for all agents
2084
+ agent_colors = {agent: generate_color(idx, len(agents)) for idx, agent in enumerate(agents)}
2085
+
2086
  # Add traces for each agent
2087
  for idx, agent_name in enumerate(agents):
2088
+ color = agent_colors[agent_name]
2089
  agent_data = data[agent_name]
2090
 
2091
  # Add line trace for acceptance rate (left y-axis)
 
2102
  name=agent_name,
2103
  mode='lines+markers',
2104
  line=dict(color=color, width=2),
2105
+ marker=dict(size=8),
2106
  legendgroup=agent_name,
2107
+ showlegend=False, # Hide legend for 70+ agents
2108
+ hovertemplate='<b>Agent: %{fullData.name}</b><br>' +
2109
+ 'Month: %{x}<br>' +
2110
  'Acceptance Rate: %{y:.2f}%<br>' +
2111
  '<extra></extra>'
2112
  ),
 
2127
  go.Bar(
2128
  x=x_bars,
2129
  y=y_bars,
2130
+ name=agent_name,
2131
  marker=dict(color=color, opacity=0.6),
2132
  legendgroup=agent_name,
2133
+ showlegend=False, # Hide legend for 70+ agents
2134
+ hovertemplate='<b>Agent: %{fullData.name}</b><br>' +
2135
+ 'Month: %{x}<br>' +
2136
  'Total Reviews: %{y}<br>' +
2137
  '<extra></extra>',
2138
  offsetgroup=agent_name # Group bars by agent for proper spacing
 
2148
  # Update layout
2149
  fig.update_layout(
2150
  title=None,
2151
+ hovermode='closest', # Show individual agent info on hover
2152
  barmode='group',
2153
  height=600,
2154
+ showlegend=False, # Hide legend for 70+ agents
2155
+ margin=dict(l=50, r=50, t=50, b=50) # Reduced top margin since no legend
 
 
 
 
 
 
2156
  )
2157
 
2158
  return fig
 
2259
 
2260
  def fetch_and_update_daily_reviews():
2261
  """
2262
+ Fetch and update reviews with comprehensive status checking using BigQuery.
2263
 
2264
  Strategy:
2265
  1. For each agent:
2266
  - Examine ALL open reviews from last LEADERBOARD_TIME_FRAME_DAYS - 1 for their closed_at status
2267
+ - Update PR status for all existing metadata using BigQuery (last LEADERBOARD_TIME_FRAME_DAYS - 1)
2268
+ - Fetch new reviews from yesterday 12am to today 12am using BigQuery
2269
  - Save all updated/new metadata back to HuggingFace
2270
  """
2271
+ # Initialize BigQuery client
2272
+ try:
2273
+ client = get_bigquery_client()
2274
+ except Exception as e:
2275
+ print(f"βœ— Failed to initialize BigQuery client: {str(e)}")
2276
+ return
2277
 
2278
  # Load all agents
2279
  agents = load_agents_from_hf()
 
2326
 
2327
  print(f" βœ“ Loaded {len(recent_metadata)} existing reviews from timeframe")
2328
 
2329
+ # Step 2: Update PR status for existing reviews using BigQuery
 
2330
  if recent_metadata:
2331
+ print(f"πŸ” Updating PR status for {len(recent_metadata)} existing reviews using BigQuery...")
2332
+ # Extract PR URLs from existing metadata
2333
+ pr_urls = [r.get('pr_url') for r in recent_metadata if r.get('pr_url')]
2334
+ if pr_urls:
2335
+ # Fetch status from BigQuery
2336
+ extended_end_date = today_utc
2337
+ status_map = fetch_pr_status_from_bigquery(client, pr_urls, cutoff_date, extended_end_date)
2338
+
2339
+ # Update metadata with new status
2340
+ for review in recent_metadata:
2341
+ pr_url = review.get('pr_url')
2342
+ if pr_url and pr_url in status_map:
2343
+ status_info = status_map[pr_url]
2344
+ review['pr_status'] = status_info['status']
2345
+ review['pr_merged'] = status_info['merged']
2346
+ review['pr_closed_at'] = status_info['closed_at']
2347
+
2348
+ print(f" βœ“ Updated PR status for existing reviews")
2349
+
2350
+ # Step 3: Fetch NEW reviews from yesterday 12am to today 12am using BigQuery
2351
+ print(f"πŸ” Fetching new reviews from {yesterday_midnight.isoformat()} to {today_midnight.isoformat()} using BigQuery...")
2352
+
2353
+ review_rows = fetch_reviews_from_bigquery(client, identifier, yesterday_midnight, today_midnight)
2354
+
2355
+ # Extract unique PR URLs and fetch status
2356
+ pr_urls = list(set([row.pr_url for row in review_rows if row.pr_url]))
2357
+ print(f" Found {len(review_rows)} review events across {len(pr_urls)} unique PRs")
2358
+
2359
+ # Fetch PR status for new reviews
2360
+ extended_end_date = today_utc
2361
+ status_map = fetch_pr_status_from_bigquery(client, pr_urls, yesterday_midnight, extended_end_date)
2362
 
2363
  # Extract metadata for new reviews
2364
  yesterday_metadata = []
2365
+ seen_prs = set()
2366
+ for row in review_rows:
2367
+ pr_url = row.pr_url
2368
+ if pr_url in seen_prs:
2369
+ continue
2370
+ seen_prs.add(pr_url)
2371
+
2372
+ status_info = status_map.get(pr_url, {
2373
+ 'status': 'open',
2374
+ 'merged': False,
2375
+ 'closed_at': None
2376
+ })
2377
 
2378
+ metadata = extract_review_metadata_from_bigquery(row, status_info)
2379
+ metadata['agent_identifier'] = identifier
2380
+ yesterday_metadata.append(metadata)
2381
 
2382
+ print(f" βœ“ Found {len(yesterday_metadata)} unique PRs in 24-hour window")
 
 
 
2383
 
2384
+ # Step 4: Combine and save all metadata
2385
  all_updated_metadata = recent_metadata + yesterday_metadata
2386
 
2387
  if all_updated_metadata:
msr.py CHANGED
@@ -1,17 +1,16 @@
1
  """
2
  Minimalist Review Metadata Mining Script
3
- Mines PR review metadata from GitHub and saves to HuggingFace dataset.
4
  """
5
 
6
  import json
7
  import os
8
- import time
9
- import requests
10
  from datetime import datetime, timezone, timedelta
11
  from collections import defaultdict
12
  from huggingface_hub import HfApi, hf_hub_download
13
  from dotenv import load_dotenv
14
- import random
15
 
16
  # Load environment variables
17
  load_dotenv()
@@ -52,792 +51,501 @@ def save_jsonl(filename, data):
52
  f.write(json.dumps(item) + '\n')
53
 
54
 
55
- def get_github_tokens():
56
- """Get all GitHub tokens from environment variables (all vars starting with GITHUB_TOKEN)."""
57
- tokens = []
58
- for key, value in os.environ.items():
59
- if key.startswith('GITHUB_TOKEN') and value:
60
- tokens.append(value)
61
-
62
- if not tokens:
63
- print("Warning: No GITHUB_TOKEN found. API rate limits: 60/hour (authenticated: 5000/hour)")
64
- else:
65
- print(f"βœ“ Loaded {len(tokens)} GitHub token(s) for rotation")
66
-
67
- return tokens
68
 
69
 
70
- class TokenPool:
71
  """
72
- Hybrid token pool with parallel execution and round-robin fallback.
73
 
74
- Splits tokens into two pools:
75
- - Parallel pool (50%): For concurrent API calls to maximize throughput
76
- - Round-robin pool (50%): Backup pool for rate limit fallback
77
-
78
- Features:
79
- - Automatic fallback when parallel tokens hit rate limits
80
- - Rate limit tracking with timestamp-based recovery
81
- - Thread-safe token management
82
- - Real-time statistics monitoring
83
  """
84
- def __init__(self, tokens):
85
- import threading
86
-
87
- self.all_tokens = tokens if tokens else [None]
88
- self.lock = threading.Lock()
89
-
90
- # Split tokens into parallel and round-robin pools (50/50)
91
- total_tokens = len(self.all_tokens)
92
- split_point = max(1, total_tokens // 2)
93
-
94
- self.parallel_tokens = self.all_tokens[:split_point]
95
- self.roundrobin_tokens = self.all_tokens[split_point:] if total_tokens > 1 else self.all_tokens
96
-
97
- # Round-robin index for fallback pool
98
- self.roundrobin_index = 0
99
-
100
- # Rate limit tracking: {token: reset_timestamp}
101
- self.parallel_rate_limited = set()
102
- self.roundrobin_rate_limited = set()
103
- self.rate_limit_resets = {}
104
-
105
- # Statistics
106
- self.stats = {
107
- 'parallel_calls': 0,
108
- 'roundrobin_calls': 0,
109
- 'fallback_triggers': 0
110
- }
111
-
112
- print(f"πŸ“Š Token Pool Initialized:")
113
- print(f" Total tokens: {total_tokens}")
114
- print(f" Parallel pool: {len(self.parallel_tokens)} tokens")
115
- print(f" Round-robin pool: {len(self.roundrobin_tokens)} tokens")
116
-
117
- def _cleanup_expired_rate_limits(self):
118
- """Remove tokens from rate-limited sets if their reset time has passed."""
119
- current_time = time.time()
120
- expired_tokens = [
121
- token for token, reset_time in self.rate_limit_resets.items()
122
- if current_time >= reset_time
123
- ]
124
-
125
- for token in expired_tokens:
126
- self.parallel_rate_limited.discard(token)
127
- self.roundrobin_rate_limited.discard(token)
128
- del self.rate_limit_resets[token]
129
- if expired_tokens:
130
- print(f" βœ“ Recovered {len(expired_tokens)} token(s) from rate limit")
131
-
132
- def get_parallel_token(self):
133
- """Get an available token from the parallel pool."""
134
- with self.lock:
135
- self._cleanup_expired_rate_limits()
136
-
137
- # Find first non-rate-limited parallel token
138
- for token in self.parallel_tokens:
139
- if token not in self.parallel_rate_limited:
140
- self.stats['parallel_calls'] += 1
141
- return token
142
-
143
- return None
144
-
145
- def get_roundrobin_token(self):
146
- """Get the next available token from round-robin pool."""
147
- with self.lock:
148
- self._cleanup_expired_rate_limits()
149
-
150
- # Try all tokens in round-robin order
151
- attempts = 0
152
- while attempts < len(self.roundrobin_tokens):
153
- token = self.roundrobin_tokens[self.roundrobin_index]
154
- self.roundrobin_index = (self.roundrobin_index + 1) % len(self.roundrobin_tokens)
155
-
156
- if token not in self.roundrobin_rate_limited:
157
- self.stats['roundrobin_calls'] += 1
158
- return token
159
-
160
- attempts += 1
161
-
162
- return None
163
-
164
- def get_next_token(self):
165
- """
166
- Get next available token, trying parallel pool first, then falling back to round-robin.
167
-
168
- Returns:
169
- Token string or None if all tokens are rate-limited
170
- """
171
- # Try parallel pool first
172
- token = self.get_parallel_token()
173
- if token:
174
- return token
175
-
176
- # Fallback to round-robin pool
177
- with self.lock:
178
- self.stats['fallback_triggers'] += 1
179
-
180
- token = self.get_roundrobin_token()
181
- if not token:
182
- print(" ⚠️ All tokens are rate-limited, waiting...")
183
 
184
- return token
 
 
 
 
185
 
186
- def get_headers(self):
187
- """Get headers with the next available token."""
188
- token = self.get_next_token()
189
- return {'Authorization': f'token {token}'} if token else {}
190
 
191
- def mark_rate_limited(self, token, reset_timestamp=None):
192
- """
193
- Mark a token as rate-limited with optional reset timestamp.
194
 
195
- Args:
196
- token: The token to mark as rate-limited
197
- reset_timestamp: Unix timestamp when rate limit resets (optional)
198
- """
199
- if not token:
200
- return
201
-
202
- with self.lock:
203
- # Determine which pool the token belongs to
204
- if token in self.parallel_tokens:
205
- self.parallel_rate_limited.add(token)
206
- if token in self.roundrobin_tokens:
207
- self.roundrobin_rate_limited.add(token)
208
-
209
- # Store reset timestamp if provided
210
- if reset_timestamp:
211
- self.rate_limit_resets[token] = reset_timestamp
212
- from datetime import datetime, timezone
213
- reset_time = datetime.fromtimestamp(reset_timestamp, tz=timezone.utc)
214
- print(f" ⏰ Token rate-limited until {reset_time.strftime('%H:%M:%S')} UTC")
215
-
216
- def get_available_parallel_tokens(self):
217
- """Get list of all available (non-rate-limited) parallel tokens."""
218
- with self.lock:
219
- self._cleanup_expired_rate_limits()
220
- return [t for t in self.parallel_tokens if t not in self.parallel_rate_limited]
221
-
222
- def get_stats(self):
223
- """Get token pool usage statistics."""
224
- with self.lock:
225
- return {
226
- 'parallel_calls': self.stats['parallel_calls'],
227
- 'roundrobin_calls': self.stats['roundrobin_calls'],
228
- 'fallback_triggers': self.stats['fallback_triggers'],
229
- 'parallel_rate_limited': len(self.parallel_rate_limited),
230
- 'roundrobin_rate_limited': len(self.roundrobin_rate_limited)
231
- }
232
 
233
- def print_stats(self):
234
- """Print token pool usage statistics."""
235
- stats = self.get_stats()
236
- total_calls = stats['parallel_calls'] + stats['roundrobin_calls']
237
-
238
- print(f"\nπŸ“Š Token Pool Statistics:")
239
- print(f" Total API calls: {total_calls}")
240
- if total_calls > 0:
241
- print(f" Parallel calls: {stats['parallel_calls']} ({stats['parallel_calls']/total_calls*100:.1f}%)")
242
- print(f" Round-robin calls: {stats['roundrobin_calls']} ({stats['roundrobin_calls']/total_calls*100:.1f}%)")
243
- print(f" Fallback triggers: {stats['fallback_triggers']}")
244
- print(f" Currently rate-limited: {stats['parallel_rate_limited']} parallel, {stats['roundrobin_rate_limited']} round-robin")
245
-
246
-
247
- def get_hf_token():
248
- """Get HuggingFace token from environment variables."""
249
- token = os.getenv('HF_TOKEN')
250
- if not token:
251
- print("Warning: HF_TOKEN not found in environment variables")
252
- return token
253
 
254
 
255
  # =============================================================================
256
- # GITHUB API FUNCTIONS
257
  # =============================================================================
258
 
259
- def request_with_backoff(method, url, *, headers=None, params=None, json_body=None, data=None, max_retries=10, timeout=30, token_pool=None, token=None):
260
  """
261
- Perform an HTTP request with exponential backoff and jitter for GitHub API.
262
- Retries on 403/429 (rate limits), 5xx server errors, and transient network exceptions.
 
 
263
 
264
  Args:
265
- token_pool: Optional TokenPool instance for rate limit tracking
266
- token: Optional token string to mark as rate-limited if 403/429 occurs
 
 
267
 
268
- Returns the final requests.Response on success or non-retryable status, or None after exhausting retries.
 
269
  """
270
- delay = 1.0
271
- for attempt in range(max_retries):
272
- try:
273
- resp = requests.request(
274
- method,
275
- url,
276
- headers=headers or {},
277
- params=params,
278
- json=json_body,
279
- data=data,
280
- timeout=timeout
281
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
282
 
283
- status = resp.status_code
284
-
285
- # Success
286
- if 200 <= status < 300:
287
- return resp
288
-
289
- # Rate limits or server errors -> retry with backoff
290
- if status in (403, 429) or 500 <= status < 600:
291
- wait = None
292
- reset_timestamp = None
293
-
294
- # Prefer Retry-After when present
295
- retry_after = resp.headers.get('Retry-After') or resp.headers.get('retry-after')
296
- if retry_after:
297
- try:
298
- wait = float(retry_after)
299
- except Exception:
300
- wait = None
301
-
302
- # Fallback to X-RateLimit-Reset when 403/429
303
- if wait is None and status in (403, 429):
304
- reset_hdr = resp.headers.get('X-RateLimit-Reset') or resp.headers.get('x-ratelimit-reset')
305
- if reset_hdr:
306
- try:
307
- reset_timestamp = int(float(reset_hdr))
308
- wait = max(reset_timestamp - time.time() + 2, 1)
309
- except Exception:
310
- wait = None
311
-
312
- # Mark token as rate-limited if we have token pool and token
313
- if status in (403, 429) and token_pool and token:
314
- token_pool.mark_rate_limited(token, reset_timestamp)
315
-
316
- # Final fallback: exponential backoff with jitter
317
- if wait is None:
318
- wait = delay + random.uniform(0, 0.5)
319
-
320
- # Cap individual wait to avoid extreme sleeps
321
- wait = max(1.0, min(wait, 120.0))
322
- print(f"GitHub API {status}. Backing off {wait:.1f}s (attempt {attempt + 1}/{max_retries})...")
323
- time.sleep(wait)
324
- delay = min(delay * 2, 60.0)
325
- continue
326
 
327
- # Non-retryable error; return response for caller to handle
328
- return resp
 
329
 
330
- except requests.RequestException as e:
331
- # Network error -> retry with backoff
332
- wait = delay + random.uniform(0, 0.5)
333
- wait = max(1.0, min(wait, 60.0))
334
- print(f"Request error: {e}. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})...")
335
- time.sleep(wait)
336
- delay = min(delay * 2, 60.0)
337
 
338
- print(f"Exceeded max retries for {url}")
339
- return None
 
340
 
341
 
342
- def fetch_reviews_with_time_partition(base_query, start_date, end_date, token_pool, prs_by_url, depth=0):
343
  """
344
- Fetch reviews within a specific time range using time-based partitioning.
345
- Recursively splits the time range if hitting the 1000-result limit.
346
- Supports splitting by day, hour, minute, and second as needed.
347
 
348
- Returns the number of reviews found in this time partition.
349
- """
350
- # Calculate time difference
351
- time_diff = end_date - start_date
352
- total_seconds = time_diff.total_seconds()
353
-
354
- # Determine granularity and format dates accordingly
355
- if total_seconds >= 86400: # >= 1 day
356
- # Use day granularity (YYYY-MM-DD)
357
- start_str = start_date.strftime('%Y-%m-%d')
358
- end_str = end_date.strftime('%Y-%m-%d')
359
- elif total_seconds >= 3600: # >= 1 hour but < 1 day
360
- # Use hour granularity (YYYY-MM-DDTHH:MM:SSZ)
361
- start_str = start_date.strftime('%Y-%m-%dT%H:00:00Z')
362
- end_str = end_date.strftime('%Y-%m-%dT%H:59:59Z')
363
- elif total_seconds >= 60: # >= 1 minute but < 1 hour
364
- # Use minute granularity (YYYY-MM-DDTHH:MM:SSZ)
365
- start_str = start_date.strftime('%Y-%m-%dT%H:%M:00Z')
366
- end_str = end_date.strftime('%Y-%m-%dT%H:%M:59Z')
367
- else: # < 1 minute
368
- # Use second granularity (YYYY-MM-DDTHH:MM:SSZ)
369
- start_str = start_date.strftime('%Y-%m-%dT%H:%M:%SZ')
370
- end_str = end_date.strftime('%Y-%m-%dT%H:%M:%SZ')
371
-
372
- # Add date range to query (use created for PR search)
373
- query = f'{base_query} created:{start_str}..{end_str}'
374
-
375
- indent = " " + " " * depth
376
- print(f"{indent}Searching range {start_str} to {end_str}...")
377
-
378
- page = 1
379
- per_page = 100
380
- total_in_partition = 0
381
-
382
- while True:
383
- url = 'https://api.github.com/search/issues' # Use issues endpoint for PR search
384
- params = {
385
- 'q': query,
386
- 'per_page': per_page,
387
- 'page': page,
388
- 'sort': 'created',
389
- 'order': 'asc'
390
  }
391
- token = token_pool.get_next_token()
392
- headers = {'Authorization': f'token {token}'} if token else {}
 
393
 
394
- try:
395
- response = request_with_backoff('GET', url, headers=headers, params=params, token_pool=token_pool, token=token)
396
- if response is None:
397
- print(f"{indent} Error: retries exhausted for range {start_str} to {end_str}")
398
- return total_in_partition
399
-
400
- if response.status_code != 200:
401
- print(f"{indent} Error: HTTP {response.status_code} for range {start_str} to {end_str}")
402
- return total_in_partition
403
-
404
- data = response.json()
405
- total_count = data.get('total_count', 0)
406
- items = data.get('items', [])
407
-
408
- if not items:
409
- break
410
-
411
- # Add PR reviews to global dict (keyed by PR URL)
412
- for pr in items:
413
- pr_url = pr.get('html_url')
414
- if pr_url and pr_url not in prs_by_url:
415
- prs_by_url[pr_url] = pr
416
- total_in_partition += 1
417
-
418
- # Check if we hit the 1000-result limit
419
- if total_count > 1000 and page == 10:
420
- print(f"{indent} ⚠️ Hit 1000-result limit ({total_count} total). Splitting time range...")
421
-
422
- # Determine how to split based on time range duration
423
- if total_seconds < 2: # Less than 2 seconds - can't split further
424
- print(f"{indent} ⚠️ Cannot split further (range < 2 seconds). Some results may be missing.")
425
- break
426
-
427
- elif total_seconds < 120: # Less than 2 minutes - split by seconds
428
- num_splits = min(4, max(2, int(total_seconds / 30)))
429
- split_duration = time_diff / num_splits
430
- split_dates = [start_date + split_duration * i for i in range(num_splits + 1)]
431
-
432
- total_from_splits = 0
433
- for i in range(num_splits):
434
- split_start = split_dates[i]
435
- split_end = split_dates[i + 1]
436
- if i > 0:
437
- split_start = split_start + timedelta(seconds=1)
438
-
439
- count = fetch_reviews_with_time_partition(
440
- base_query, split_start, split_end, token_pool, prs_by_url, depth + 1
441
- )
442
- total_from_splits += count
443
-
444
- return total_from_splits
445
-
446
- elif total_seconds < 7200: # Less than 2 hours - split by minutes
447
- num_splits = min(4, max(2, int(total_seconds / 1800)))
448
- split_duration = time_diff / num_splits
449
- split_dates = [start_date + split_duration * i for i in range(num_splits + 1)]
450
-
451
- total_from_splits = 0
452
- for i in range(num_splits):
453
- split_start = split_dates[i]
454
- split_end = split_dates[i + 1]
455
- if i > 0:
456
- split_start = split_start + timedelta(minutes=1)
457
-
458
- count = fetch_reviews_with_time_partition(
459
- base_query, split_start, split_end, token_pool, prs_by_url, depth + 1
460
- )
461
- total_from_splits += count
462
-
463
- return total_from_splits
464
-
465
- elif total_seconds < 172800: # Less than 2 days - split by hours
466
- num_splits = min(4, max(2, int(total_seconds / 43200)))
467
- split_duration = time_diff / num_splits
468
- split_dates = [start_date + split_duration * i for i in range(num_splits + 1)]
469
-
470
- total_from_splits = 0
471
- for i in range(num_splits):
472
- split_start = split_dates[i]
473
- split_end = split_dates[i + 1]
474
- if i > 0:
475
- split_start = split_start + timedelta(hours=1)
476
-
477
- count = fetch_reviews_with_time_partition(
478
- base_query, split_start, split_end, token_pool, prs_by_url, depth + 1
479
- )
480
- total_from_splits += count
481
-
482
- return total_from_splits
483
-
484
- else: # 2+ days - split by days
485
- days_diff = time_diff.days
486
-
487
- # Use aggressive splitting for large ranges or deep recursion
488
- if days_diff > 30 or depth > 5:
489
- # Split into 4 parts for more aggressive partitioning
490
- quarter_diff = time_diff / 4
491
- split_dates = [
492
- start_date,
493
- start_date + quarter_diff,
494
- start_date + quarter_diff * 2,
495
- start_date + quarter_diff * 3,
496
- end_date
497
- ]
498
-
499
- total_from_splits = 0
500
- for i in range(4):
501
- split_start = split_dates[i]
502
- split_end = split_dates[i + 1]
503
- if i > 0:
504
- split_start = split_start + timedelta(days=1)
505
-
506
- count = fetch_reviews_with_time_partition(
507
- base_query, split_start, split_end, token_pool, prs_by_url, depth + 1
508
- )
509
- total_from_splits += count
510
-
511
- return total_from_splits
512
- else:
513
- # Binary split for smaller ranges
514
- mid_date = start_date + time_diff / 2
515
-
516
- count1 = fetch_reviews_with_time_partition(
517
- base_query, start_date, mid_date, token_pool, prs_by_url, depth + 1
518
- )
519
- count2 = fetch_reviews_with_time_partition(
520
- base_query, mid_date + timedelta(days=1), end_date, token_pool, prs_by_url, depth + 1
521
- )
522
-
523
- return count1 + count2
524
-
525
- # Normal pagination: check if there are more pages
526
- if len(items) < per_page or page >= 10:
527
- break
528
-
529
- page += 1
530
- time.sleep(0.5) # Courtesy delay between pages
531
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
532
  except Exception as e:
533
- print(f"{indent} Error fetching range {start_str} to {end_str}: {str(e)}")
534
- return total_in_partition
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
535
 
536
- if total_in_partition > 0:
537
- print(f"{indent} βœ“ Found {total_in_partition} reviews in range {start_str} to {end_str}")
 
538
 
539
- return total_in_partition
540
 
 
 
 
 
541
 
542
- def fetch_reviews_parallel(query_patterns, start_date, end_date, token_pool, prs_by_url):
543
- """
544
- Fetch reviews for multiple query patterns in parallel using available parallel tokens.
545
 
546
- This function uses ThreadPoolExecutor to execute multiple query patterns concurrently,
547
- with each pattern using a dedicated token from the parallel pool. Falls back to
548
- sequential execution if insufficient parallel tokens are available.
549
 
550
- Args:
551
- query_patterns: List of query pattern strings (e.g., ['is:pr author:bot1', 'is:pr reviewed-by:bot1'])
552
- start_date: Start datetime for time range
553
- end_date: End datetime for time range
554
- token_pool: TokenPool instance for token management
555
- prs_by_url: Dictionary to collect PRs by URL (shared across patterns)
556
 
557
- Returns:
558
- Total number of PRs found across all patterns
559
- """
560
- from concurrent.futures import ThreadPoolExecutor, as_completed
561
- import threading
562
-
563
- # Check how many parallel tokens are available
564
- available_tokens = token_pool.get_available_parallel_tokens()
565
-
566
- if len(available_tokens) < 2 or len(query_patterns) < 2:
567
- # Not enough tokens or patterns for parallelization, use sequential
568
- print(f" ⚠️ Sequential execution: {len(available_tokens)} parallel tokens available for {len(query_patterns)} patterns")
569
- total_found = 0
570
- for pattern in query_patterns:
571
- pattern_prs = {}
572
- count = fetch_reviews_with_time_partition(
573
- pattern, start_date, end_date, token_pool, pattern_prs, depth=0
574
- )
575
- # Merge pattern results into global dict
576
- lock = threading.Lock()
577
- with lock:
578
- for url, pr in pattern_prs.items():
579
- if url not in prs_by_url:
580
- prs_by_url[url] = pr
581
- total_found += count
582
- return total_found
583
-
584
- # Use parallel execution
585
- print(f" πŸš€ Parallel execution: {len(available_tokens)} parallel tokens for {len(query_patterns)} patterns")
586
-
587
- # Thread-safe lock for updating prs_by_url
588
- lock = threading.Lock()
589
-
590
- def fetch_pattern(pattern):
591
- """Fetch reviews for a single pattern (runs in parallel)."""
592
- pattern_prs = {}
593
- try:
594
- count = fetch_reviews_with_time_partition(
595
- pattern, start_date, end_date, token_pool, pattern_prs, depth=0
596
- )
597
- return pattern, pattern_prs, count
598
- except Exception as e:
599
- print(f" Error fetching pattern '{pattern}': {str(e)}")
600
- return pattern, {}, 0
601
-
602
- # Execute patterns in parallel
603
- max_workers = min(len(query_patterns), len(available_tokens))
604
- total_found = 0
605
-
606
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
607
- # Submit all patterns
608
- future_to_pattern = {
609
- executor.submit(fetch_pattern, pattern): pattern
610
- for pattern in query_patterns
611
- }
612
 
613
- # Collect results as they complete
614
- for future in as_completed(future_to_pattern):
615
- pattern = future_to_pattern[future]
616
- try:
617
- _, pattern_prs, count = future.result()
 
 
 
618
 
619
- # Merge results into global dict (thread-safe)
620
- with lock:
621
- for url, pr in pattern_prs.items():
622
- if url not in prs_by_url:
623
- prs_by_url[url] = pr
624
 
625
- total_found += count
626
- print(f" βœ“ Pattern '{pattern}' completed: {count} PRs found")
627
 
628
- except Exception as e:
629
- print(f" βœ— Pattern '{pattern}' failed: {str(e)}")
630
 
631
- return total_found
 
 
 
632
 
633
 
634
- def extract_review_metadata(pr):
635
  """
636
- Extract minimal PR review metadata for efficient storage.
637
- Only keeps essential fields: html_url, reviewed_at, pr_status, pr_merged, pr_closed_at.
638
 
639
- PR status:
640
- - pr_status: 'open', 'merged', or 'closed'
641
- - pr_merged: True if PR was merged, False otherwise
642
- - pr_closed_at: Date when PR was closed/merged (if applicable)
 
 
643
  """
644
- pr_url = pr.get('html_url')
645
- pr_number = pr.get('number')
646
- created_at = pr.get('created_at')
647
- closed_at = pr.get('closed_at')
648
- state = pr.get('state', 'open') # open or closed
649
-
650
- # Check if PR has pull_request field (indicates it's a PR, not an issue)
651
- pull_request_data = pr.get('pull_request', {})
652
- pr_merged = pull_request_data.get('merged_at') is not None if pull_request_data else False
653
-
654
- # Determine initial status
655
- if pr_merged:
656
- status = 'merged'
657
- elif state == 'closed':
658
- status = 'closed'
659
- else:
660
- status = 'open'
661
 
662
  return {
663
  'html_url': pr_url,
664
- 'reviewed_at': created_at, # When the PR was created (agent reviewed it)
665
- 'pr_status': status,
666
- 'pr_merged': pr_merged,
667
- 'pr_closed_at': closed_at,
668
- 'pr_url': pr_url, # Store PR URL for tracking
669
- 'review_id': f"pr_{pr_number}" # Use PR number for deduplication
670
  }
671
 
672
 
673
- def update_pr_status(metadata_list, token_pool):
674
  """
675
- Update PR status for reviews to get current merged/closed state.
676
-
677
- For each PR associated with a review, fetch current status from GitHub API.
678
- Updates metadata_list in-place with PR status information.
679
 
680
  Args:
681
- metadata_list: List of review metadata dictionaries
682
- token_pool: TokenPool instance for rotating tokens
683
 
684
  Returns:
685
- Updated metadata_list with current PR status
686
  """
687
- if not metadata_list:
688
- return metadata_list
 
 
 
 
689
 
690
- # Track unique PRs to avoid duplicate API calls
691
- pr_url_to_status = {}
692
- updated_count = 0
 
693
 
694
- for metadata in metadata_list:
695
- pr_url = metadata.get('pr_url')
696
- if not pr_url:
697
- continue
698
 
699
- # Skip if already fetched for this PR
700
- if pr_url in pr_url_to_status:
701
- status_info = pr_url_to_status[pr_url]
702
- metadata['pr_status'] = status_info['status']
703
- metadata['pr_merged'] = status_info['merged']
704
- metadata['pr_closed_at'] = status_info['closed_at']
705
- continue
706
 
707
- try:
708
- # Convert HTML URL to API URL
709
- # https://github.com/owner/repo/pull/123 -> https://api.github.com/repos/owner/repo/pulls/123
710
- parts = pr_url.replace('https://github.com/', '').split('/')
711
- if len(parts) >= 4:
712
- owner, repo, pull_word, pr_number = parts[0], parts[1], parts[2], parts[3]
713
- api_url = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}'
714
-
715
- token = token_pool.get_next_token()
716
- headers = {'Authorization': f'token {token}'} if token else {}
717
- response = request_with_backoff('GET', api_url, headers=headers, max_retries=3, token_pool=token_pool, token=token)
718
-
719
- if response and response.status_code == 200:
720
- pr_data = response.json()
721
- state = pr_data.get('state', 'open')
722
- merged = pr_data.get('merged', False)
723
- closed_at = pr_data.get('closed_at')
724
- merged_at = pr_data.get('merged_at')
725
-
726
- # Determine final status
727
- if merged:
728
- status = 'merged'
729
- elif state == 'closed':
730
- status = 'closed'
731
- else:
732
- status = 'open'
733
-
734
- status_info = {
735
- 'status': status,
736
- 'merged': merged,
737
- 'closed_at': closed_at or merged_at
738
- }
739
-
740
- # Cache and update
741
- pr_url_to_status[pr_url] = status_info
742
- metadata['pr_status'] = status
743
- metadata['pr_merged'] = merged
744
- metadata['pr_closed_at'] = closed_at or merged_at
745
- updated_count += 1
746
-
747
- # Small delay to avoid rate limiting
748
- time.sleep(0.1)
749
 
750
- except Exception as e:
751
- print(f" Warning: Could not check PR status for {pr_url}: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
752
  continue
 
753
 
754
- if updated_count > 0:
755
- print(f" βœ“ Updated status for {updated_count} unique PRs")
 
 
 
756
 
757
- return metadata_list
 
758
 
 
759
 
760
- def fetch_all_reviews_metadata(identifier, agent_name, token_pool):
761
- """
762
- Fetch PR reviews associated with a GitHub user or bot for the past LEADERBOARD_TIME_FRAME_DAYS.
763
- Returns lightweight metadata instead of full review objects.
764
 
765
- This function employs time-based partitioning to navigate GitHub's 1000-result limit per query.
766
- It searches using the query pattern:
767
- - reviewed-by:{identifier} (PR reviews by the agent)
768
 
769
- After fetching reviews, it updates PR status to determine if PRs were merged or closed.
 
 
 
770
 
771
  Args:
772
- identifier: GitHub username or bot identifier
773
- agent_name: Human-readable name of the agent for metadata purposes
774
- token_pool: TokenPool instance for rotating tokens
775
 
776
  Returns:
777
- List of dictionaries containing minimal PR review metadata with PR status
 
 
 
 
778
  """
 
 
779
 
780
- # Define query pattern for PR reviews
781
- query_patterns = [f'is:pr reviewed-by:{identifier}']
782
-
783
- # Use a dict to deduplicate PRs by URL
784
- prs_by_url = {}
 
785
 
786
  # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
787
  current_time = datetime.now(timezone.utc)
788
- end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0) # 12:00 AM UTC today
789
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
790
 
791
- print(f"\nπŸ” Searching for PRs reviewed by {identifier}")
792
- print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')} (today excluded)")
793
- print(f" Query patterns: {len(query_patterns)}")
794
-
795
- overall_start_time = time.time()
796
-
797
- # Use parallel execution if multiple patterns and sufficient tokens
798
- if len(query_patterns) > 1:
799
- reviews_found = fetch_reviews_parallel(
800
- query_patterns,
801
- start_date,
802
- end_date,
803
- token_pool,
804
- prs_by_url
805
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
806
  else:
807
- # Single pattern, use sequential
808
- reviews_found = fetch_reviews_with_time_partition(
809
- query_patterns[0],
810
- start_date,
811
- end_date,
812
- token_pool,
813
- prs_by_url
814
- )
815
 
816
- overall_duration = time.time() - overall_start_time
817
- print(f" βœ“ All patterns complete: {len(prs_by_url)} unique PRs found")
818
- print(f" ⏱️ Total time: {overall_duration:.1f} seconds")
 
 
819
 
820
- all_prs = list(prs_by_url.values())
 
 
 
 
821
 
822
- print(f"\nβœ… COMPLETE: Found {len(all_prs)} unique PRs reviewed by {identifier}")
823
- print(f"πŸ“¦ Extracting minimal metadata and updating PR status...")
824
 
825
- # Extract metadata for each PR review
826
- metadata_list = [extract_review_metadata(pr) for pr in all_prs]
 
 
 
 
 
 
827
 
828
- # Update PR status to get current merged/closed state
829
- print(f"πŸ” Updating PR status for reviewed PRs...")
830
- metadata_list = update_pr_status(metadata_list, token_pool)
 
 
831
 
832
- # Calculate memory savings
833
- import sys
834
- original_size = sys.getsizeof(str(all_prs))
835
- metadata_size = sys.getsizeof(str(metadata_list))
836
- savings_pct = ((original_size - metadata_size) / original_size * 100) if original_size > 0 else 0
837
 
838
- print(f"πŸ’Ύ Memory efficiency: {original_size // 1024}KB β†’ {metadata_size // 1024}KB (saved {savings_pct:.1f}%)")
 
839
 
840
- return metadata_list
841
 
842
 
843
  # =============================================================================
@@ -866,37 +574,6 @@ def group_metadata_by_date(metadata_list):
866
  return dict(grouped)
867
 
868
 
869
- def upload_with_retry(api, path_or_fileobj, path_in_repo, repo_id, repo_type, token, max_retries=5):
870
- """
871
- Upload file to HuggingFace with exponential backoff retry logic.
872
- """
873
- delay = 2.0
874
-
875
- for attempt in range(max_retries):
876
- try:
877
- api.upload_file(
878
- path_or_fileobj=path_or_fileobj,
879
- path_in_repo=path_in_repo,
880
- repo_id=repo_id,
881
- repo_type=repo_type,
882
- token=token
883
- )
884
- if attempt > 0:
885
- print(f" βœ“ Upload succeeded on attempt {attempt + 1}/{max_retries}")
886
- return True
887
-
888
- except Exception as e:
889
- if attempt < max_retries - 1:
890
- wait_time = delay + random.uniform(0, 1.0)
891
- print(f" ⚠️ Upload failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
892
- print(f" ⏳ Retrying in {wait_time:.1f} seconds...")
893
- time.sleep(wait_time)
894
- delay = min(delay * 2, 60.0)
895
- else:
896
- print(f" βœ— Upload failed after {max_retries} attempts: {str(e)}")
897
- raise
898
-
899
-
900
  def save_review_metadata_to_hf(metadata_list, agent_identifier):
901
  """
902
  Save review metadata to HuggingFace dataset, organized by [agent_identifier]/YYYY.MM.DD.jsonl.
@@ -909,7 +586,6 @@ def save_review_metadata_to_hf(metadata_list, agent_identifier):
909
  metadata_list: List of review metadata dictionaries
910
  agent_identifier: GitHub identifier of the agent (used as folder name)
911
  """
912
- import tempfile
913
  import shutil
914
 
915
  try:
@@ -987,7 +663,11 @@ def save_review_metadata_to_hf(metadata_list, agent_identifier):
987
 
988
 
989
  def load_agents_from_hf():
990
- """Load all agent metadata JSON files from HuggingFace dataset."""
 
 
 
 
991
  try:
992
  api = HfApi()
993
  agents = []
@@ -1011,6 +691,11 @@ def load_agents_from_hf():
1011
 
1012
  with open(file_path, 'r') as f:
1013
  agent_data = json.load(f)
 
 
 
 
 
1014
  agents.append(agent_data)
1015
 
1016
  except Exception as e:
@@ -1032,10 +717,8 @@ def load_agents_from_hf():
1032
  def mine_all_agents():
1033
  """
1034
  Mine review metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace.
 
1035
  """
1036
- tokens = get_github_tokens()
1037
- token_pool = TokenPool(tokens)
1038
-
1039
  # Load agent metadata from HuggingFace
1040
  agents = load_agents_from_hf()
1041
  if not agents:
@@ -1045,34 +728,43 @@ def mine_all_agents():
1045
  print(f"\n{'='*80}")
1046
  print(f"Starting review metadata mining for {len(agents)} agents")
1047
  print(f"Time frame: Last {LEADERBOARD_TIME_FRAME_DAYS} days")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1048
  print(f"{'='*80}\n")
1049
 
1050
- # Mine each agent
1051
  for agent in agents:
1052
  identifier = agent.get('github_identifier')
1053
- agent_name = agent.get('agent_name', 'Unknown')
1054
 
1055
  if not identifier:
1056
  print(f"Warning: Skipping agent without identifier: {agent}")
1057
  continue
1058
 
1059
- try:
1060
- print(f"\n{'='*80}")
1061
- print(f"Processing: {agent_name} ({identifier})")
1062
- print(f"{'='*80}")
1063
-
1064
- # Fetch review metadata
1065
- metadata = fetch_all_reviews_metadata(identifier, agent_name, token_pool)
1066
 
 
1067
  if metadata:
1068
- print(f"πŸ’Ύ Saving {len(metadata)} review records...")
1069
  save_review_metadata_to_hf(metadata, identifier)
1070
- print(f"βœ“ Successfully processed {agent_name}")
1071
  else:
1072
  print(f" No reviews found for {agent_name}")
1073
 
1074
  except Exception as e:
1075
- print(f"βœ— Error processing {identifier}: {str(e)}")
1076
  import traceback
1077
  traceback.print_exc()
1078
  continue
 
1
  """
2
  Minimalist Review Metadata Mining Script
3
+ Mines PR review metadata from GitHub Archive via BigQuery and saves to HuggingFace dataset.
4
  """
5
 
6
  import json
7
  import os
8
+ import tempfile
 
9
  from datetime import datetime, timezone, timedelta
10
  from collections import defaultdict
11
  from huggingface_hub import HfApi, hf_hub_download
12
  from dotenv import load_dotenv
13
+ from google.cloud import bigquery
14
 
15
  # Load environment variables
16
  load_dotenv()
 
51
  f.write(json.dumps(item) + '\n')
52
 
53
 
54
+ def get_hf_token():
55
+ """Get HuggingFace token from environment variables."""
56
+ token = os.getenv('HF_TOKEN')
57
+ if not token:
58
+ print("Warning: HF_TOKEN not found in environment variables")
59
+ return token
 
 
 
 
 
 
 
60
 
61
 
62
+ def get_bigquery_client():
63
  """
64
+ Initialize BigQuery client using credentials from environment variable.
65
 
66
+ Expects GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable containing
67
+ the service account JSON credentials as a string.
 
 
 
 
 
 
 
68
  """
69
+ # Get the JSON content from environment variable
70
+ creds_json = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS_JSON')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
+ if creds_json:
73
+ # Create a temporary file to store credentials
74
+ with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
75
+ temp_file.write(creds_json)
76
+ temp_path = temp_file.name
77
 
78
+ # Set environment variable to point to temp file
79
+ os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = temp_path
 
 
80
 
81
+ # Initialize BigQuery client
82
+ client = bigquery.Client()
 
83
 
84
+ # Clean up temp file
85
+ os.unlink(temp_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
 
87
+ return client
88
+ else:
89
+ raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
 
91
 
92
  # =============================================================================
93
+ # BIGQUERY FUNCTIONS
94
  # =============================================================================
95
 
96
+ def fetch_reviews_from_bigquery(client, identifier, start_date, end_date):
97
  """
98
+ Fetch PR review events from GitHub Archive for a specific agent.
99
+
100
+ Queries githubarchive.day.YYYYMMDD tables for PullRequestReviewEvent where
101
+ actor.login matches the agent identifier.
102
 
103
  Args:
104
+ client: BigQuery client instance
105
+ identifier: GitHub username or bot identifier (e.g., 'amazon-inspector-beta[bot]')
106
+ start_date: Start datetime (timezone-aware)
107
+ end_date: End datetime (timezone-aware)
108
 
109
+ Returns:
110
+ List of review event rows with PR information
111
  """
112
+ print(f"\nπŸ” Querying BigQuery for reviews by {identifier}")
113
+ print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
114
+
115
+ # Generate list of table names for each day in the range
116
+ table_refs = []
117
+ current_date = start_date
118
+ while current_date < end_date:
119
+ table_name = f"githubarchive.day.{current_date.strftime('%Y%m%d')}"
120
+ table_refs.append(table_name)
121
+ current_date += timedelta(days=1)
122
+
123
+ # Build UNION ALL query for all daily tables
124
+ union_parts = []
125
+ for table_name in table_refs:
126
+ union_parts.append(f"""
127
+ SELECT
128
+ repo.name as repo_name,
129
+ actor.login as actor_login,
130
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.html_url') as pr_url,
131
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.number') AS INT64) as pr_number,
132
+ JSON_EXTRACT_SCALAR(payload, '$.review.submitted_at') as reviewed_at,
133
+ created_at
134
+ FROM `{table_name}`
135
+ WHERE type = 'PullRequestReviewEvent'
136
+ AND actor.login = @identifier
137
+ """)
138
+
139
+ query = " UNION ALL ".join(union_parts)
140
+
141
+ job_config = bigquery.QueryJobConfig(
142
+ query_parameters=[
143
+ bigquery.ScalarQueryParameter("identifier", "STRING", identifier)
144
+ ]
145
+ )
146
 
147
+ print(f" Querying {len(table_refs)} daily tables...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
+ try:
150
+ query_job = client.query(query, job_config=job_config)
151
+ results = list(query_job.result())
152
 
153
+ print(f" βœ“ Found {len(results)} review events")
154
+ return results
 
 
 
 
 
155
 
156
+ except Exception as e:
157
+ print(f" βœ— BigQuery error: {str(e)}")
158
+ return []
159
 
160
 
161
+ def fetch_pr_status_from_bigquery(client, pr_urls, start_date, end_date):
162
  """
163
+ Fetch PR status (merged/closed) from GitHub Archive PullRequestEvent.
 
 
164
 
165
+ For each PR URL, looks for PullRequestEvent with action='closed' to determine
166
+ if the PR was merged or just closed.
167
+
168
+ Args:
169
+ client: BigQuery client instance
170
+ pr_urls: List of PR URLs to check status for
171
+ start_date: Start datetime (should cover review period and after)
172
+ end_date: End datetime (should be recent/current)
173
+
174
+ Returns:
175
+ Dictionary mapping PR URL to status dict:
176
+ {
177
+ 'pr_url': {
178
+ 'status': 'merged'|'closed'|'open',
179
+ 'merged': bool,
180
+ 'closed_at': timestamp or None
181
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  }
183
+ """
184
+ if not pr_urls:
185
+ return {}
186
 
187
+ print(f"\nπŸ” Querying BigQuery for PR status ({len(pr_urls)} PRs)...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188
 
189
+ # Extract repo and PR number from URLs
190
+ # URL format: https://github.com/owner/repo/pull/123
191
+ pr_info = []
192
+ for url in pr_urls:
193
+ try:
194
+ parts = url.replace('https://github.com/', '').split('/')
195
+ if len(parts) >= 4:
196
+ owner = parts[0]
197
+ repo = parts[1]
198
+ pr_number = int(parts[3])
199
+ repo_name = f"{owner}/{repo}"
200
+ pr_info.append({
201
+ 'url': url,
202
+ 'repo': repo_name,
203
+ 'number': pr_number
204
+ })
205
  except Exception as e:
206
+ print(f" Warning: Could not parse PR URL {url}: {e}")
207
+ continue
208
+
209
+ if not pr_info:
210
+ return {}
211
+
212
+ # Build repo filter condition for WHERE clause
213
+ # Group PRs by repo to create efficient filters
214
+ repos_to_prs = defaultdict(list)
215
+ for pr in pr_info:
216
+ repos_to_prs[pr['repo']].append(pr['number'])
217
+
218
+ # Generate list of table names for date range
219
+ # Look back 1 full year from end_date to catch PR close events that may have occurred before reviews
220
+ pr_status_start = end_date - timedelta(days=365)
221
+ table_refs = []
222
+ current_date = pr_status_start
223
+ while current_date < end_date:
224
+ table_name = f"githubarchive.day.{current_date.strftime('%Y%m%d')}"
225
+ table_refs.append(table_name)
226
+ current_date += timedelta(days=1)
227
+
228
+ # Build WHERE clause to filter by specific repos and PR numbers
229
+ # Format: (repo='owner/repo1' AND pr_number IN (1,2,3)) OR (repo='owner/repo2' AND pr_number IN (4,5))
230
+ filter_conditions = []
231
+ for repo, pr_numbers in repos_to_prs.items():
232
+ pr_list = ','.join(map(str, pr_numbers))
233
+ filter_conditions.append(f"(repo.name = '{repo}' AND CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.number') AS INT64) IN ({pr_list}))")
234
+
235
+ pr_filter = " OR ".join(filter_conditions)
236
+
237
+ # Build query to find close/merge events for specific PRs
238
+ union_parts = []
239
+ for table_name in table_refs:
240
+ union_parts.append(f"""
241
+ SELECT
242
+ repo.name as repo_name,
243
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.number') AS INT64) as pr_number,
244
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.html_url') as pr_url,
245
+ JSON_EXTRACT_SCALAR(payload, '$.action') as action,
246
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.merged') AS BOOL) as merged,
247
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.closed_at') as closed_at,
248
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.merged_at') as merged_at,
249
+ created_at
250
+ FROM `{table_name}`
251
+ WHERE type = 'PullRequestEvent'
252
+ AND JSON_EXTRACT_SCALAR(payload, '$.action') = 'closed'
253
+ AND ({pr_filter})
254
+ """)
255
+
256
+ query = " UNION ALL ".join(union_parts)
257
+
258
+ print(f" Querying {len(table_refs)} daily tables for PR status (1-year lookback: {pr_status_start.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')})...")
259
+ print(f" Filtering for {len(pr_info)} specific PRs across {len(repos_to_prs)} repos")
260
 
261
+ try:
262
+ query_job = client.query(query)
263
+ results = list(query_job.result())
264
 
265
+ print(f" βœ“ Found {len(results)} PR close events")
266
 
267
+ # Build status map by PR URL
268
+ status_map = {}
269
+ for row in results:
270
+ pr_url = row.pr_url
271
 
272
+ merged = row.merged if row.merged is not None else False
273
+ closed_at = row.closed_at or row.merged_at
 
274
 
275
+ # Convert to ISO format if datetime
276
+ if hasattr(closed_at, 'isoformat'):
277
+ closed_at = closed_at.isoformat()
278
 
279
+ status = 'merged' if merged else 'closed'
 
 
 
 
 
280
 
281
+ status_map[pr_url] = {
282
+ 'status': status,
283
+ 'merged': merged,
284
+ 'closed_at': closed_at
285
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
286
 
287
+ # Mark remaining PRs as open
288
+ for url in pr_urls:
289
+ if url not in status_map:
290
+ status_map[url] = {
291
+ 'status': 'open',
292
+ 'merged': False,
293
+ 'closed_at': None
294
+ }
295
 
296
+ merged_count = sum(1 for s in status_map.values() if s['merged'])
297
+ closed_count = sum(1 for s in status_map.values() if s['status'] == 'closed')
298
+ open_count = sum(1 for s in status_map.values() if s['status'] == 'open')
 
 
299
 
300
+ print(f" Status breakdown: {merged_count} merged, {closed_count} closed, {open_count} open")
 
301
 
302
+ return status_map
 
303
 
304
+ except Exception as e:
305
+ print(f" βœ— BigQuery error: {str(e)}")
306
+ # Return all as open on error
307
+ return {url: {'status': 'open', 'merged': False, 'closed_at': None} for url in pr_urls}
308
 
309
 
310
+ def extract_review_metadata(review_row, status_info):
311
  """
312
+ Extract minimal PR review metadata from BigQuery row and status info.
 
313
 
314
+ Args:
315
+ review_row: BigQuery row from PullRequestReviewEvent query
316
+ status_info: Status dictionary from fetch_pr_status_from_bigquery
317
+
318
+ Returns:
319
+ Dictionary with review metadata
320
  """
321
+ pr_url = review_row.pr_url
322
+ pr_number = review_row.pr_number
323
+ reviewed_at = review_row.reviewed_at or review_row.created_at
324
+
325
+ # Convert to ISO format if datetime
326
+ if hasattr(reviewed_at, 'isoformat'):
327
+ reviewed_at = reviewed_at.isoformat()
 
 
 
 
 
 
 
 
 
 
328
 
329
  return {
330
  'html_url': pr_url,
331
+ 'reviewed_at': reviewed_at,
332
+ 'pr_status': status_info['status'],
333
+ 'pr_merged': status_info['merged'],
334
+ 'pr_closed_at': status_info['closed_at'],
335
+ 'pr_url': pr_url,
336
+ 'review_id': f"pr_{pr_number}"
337
  }
338
 
339
 
340
+ def fetch_all_reviews_metadata(identifier, agent_name):
341
  """
342
+ Fetch PR reviews associated with a GitHub user or bot for the past LEADERBOARD_TIME_FRAME_DAYS.
343
+ Uses BigQuery to query GitHub Archive instead of GitHub API.
 
 
344
 
345
  Args:
346
+ identifier: GitHub username or bot identifier (for BigQuery queries)
347
+ agent_name: Human-readable name of the agent (for display only)
348
 
349
  Returns:
350
+ List of dictionaries containing minimal PR review metadata with PR status
351
  """
352
+ # Initialize BigQuery client
353
+ try:
354
+ client = get_bigquery_client()
355
+ except Exception as e:
356
+ print(f"βœ— Failed to initialize BigQuery client: {str(e)}")
357
+ return []
358
 
359
+ # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
360
+ current_time = datetime.now(timezone.utc)
361
+ end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0) # 12:00 AM UTC today
362
+ start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
363
 
364
+ print(f"\n{'='*80}")
365
+ print(f"Fetching reviews for: {agent_name} ({identifier})")
366
+ print(f"{'='*80}")
 
367
 
368
+ # Fetch review events from BigQuery
369
+ review_rows = fetch_reviews_from_bigquery(client, identifier, start_date, end_date)
 
 
 
 
 
370
 
371
+ if not review_rows:
372
+ print(f" No reviews found for {identifier}")
373
+ return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
374
 
375
+ # Extract unique PR URLs
376
+ pr_urls = list(set([row.pr_url for row in review_rows if row.pr_url]))
377
+ print(f"\nπŸ“Š Found {len(review_rows)} review events across {len(pr_urls)} unique PRs")
378
+
379
+ # Fetch PR status from BigQuery
380
+ # Use extended end date to catch recent merges/closes
381
+ extended_end_date = current_time
382
+ status_map = fetch_pr_status_from_bigquery(client, pr_urls, start_date, extended_end_date)
383
+
384
+ # Extract metadata for each review
385
+ print(f"\nπŸ“¦ Extracting metadata...")
386
+ metadata_list = []
387
+
388
+ # Deduplicate by PR URL (multiple reviews on same PR)
389
+ seen_prs = set()
390
+ for row in review_rows:
391
+ pr_url = row.pr_url
392
+ if pr_url in seen_prs:
393
  continue
394
+ seen_prs.add(pr_url)
395
 
396
+ status_info = status_map.get(pr_url, {
397
+ 'status': 'open',
398
+ 'merged': False,
399
+ 'closed_at': None
400
+ })
401
 
402
+ metadata = extract_review_metadata(row, status_info)
403
+ metadata_list.append(metadata)
404
 
405
+ print(f" βœ“ Extracted {len(metadata_list)} unique PR review records")
406
 
407
+ return metadata_list
 
 
 
408
 
 
 
 
409
 
410
+ def fetch_all_reviews_metadata_batch(agents):
411
+ """
412
+ Fetch PR reviews for ALL agents in a single batch operation.
413
+ Uses only 2 BigQuery queries total (instead of 2*N queries for N agents).
414
 
415
  Args:
416
+ agents: List of agent dictionaries with 'github_identifier' and 'name' fields
 
 
417
 
418
  Returns:
419
+ Dictionary mapping agent identifier to list of review metadata:
420
+ {
421
+ 'agent-identifier': [metadata_list],
422
+ ...
423
+ }
424
  """
425
+ if not agents:
426
+ return {}
427
 
428
+ # Initialize BigQuery client
429
+ try:
430
+ client = get_bigquery_client()
431
+ except Exception as e:
432
+ print(f"βœ— Failed to initialize BigQuery client: {str(e)}")
433
+ return {}
434
 
435
  # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
436
  current_time = datetime.now(timezone.utc)
437
+ end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
438
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
439
 
440
+ # Extract all identifiers
441
+ identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
442
+ if not identifiers:
443
+ return {}
444
+
445
+ print(f"\nπŸš€ BATCH MODE: Fetching reviews for {len(identifiers)} agents in 2 queries")
446
+ print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
447
+
448
+ # =========================================================================
449
+ # QUERY 1: Fetch ALL review events for ALL agents in one query
450
+ # =========================================================================
451
+ print(f"\nπŸ” Query 1/2: Fetching ALL review events...")
452
+
453
+ # Generate list of table names
454
+ table_refs = []
455
+ current_date = start_date
456
+ while current_date < end_date:
457
+ table_name = f"githubarchive.day.{current_date.strftime('%Y%m%d')}"
458
+ table_refs.append(table_name)
459
+ current_date += timedelta(days=1)
460
+
461
+ # Build IN clause for all identifiers
462
+ identifier_list = ', '.join([f"'{id}'" for id in identifiers])
463
+
464
+ # Build UNION ALL query for all daily tables
465
+ union_parts = []
466
+ for table_name in table_refs:
467
+ union_parts.append(f"""
468
+ SELECT
469
+ repo.name as repo_name,
470
+ actor.login as actor_login,
471
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.html_url') as pr_url,
472
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.number') AS INT64) as pr_number,
473
+ JSON_EXTRACT_SCALAR(payload, '$.review.submitted_at') as reviewed_at,
474
+ created_at
475
+ FROM `{table_name}`
476
+ WHERE type = 'PullRequestReviewEvent'
477
+ AND actor.login IN ({identifier_list})
478
+ """)
479
+
480
+ query = " UNION ALL ".join(union_parts)
481
+
482
+ print(f" Querying {len(table_refs)} daily tables...")
483
+
484
+ try:
485
+ query_job = client.query(query)
486
+ all_review_rows = list(query_job.result())
487
+ print(f" βœ“ Found {len(all_review_rows)} total review events")
488
+ except Exception as e:
489
+ print(f" βœ— BigQuery error: {str(e)}")
490
+ return {}
491
+
492
+ # Group reviews by agent
493
+ reviews_by_agent = defaultdict(list)
494
+ all_pr_urls = set()
495
+ for row in all_review_rows:
496
+ reviews_by_agent[row.actor_login].append(row)
497
+ if row.pr_url:
498
+ all_pr_urls.add(row.pr_url)
499
+
500
+ print(f" πŸ“Š Reviews found for {len(reviews_by_agent)} agents")
501
+ print(f" πŸ“Š {len(all_pr_urls)} unique PRs to check status for")
502
+
503
+ # =========================================================================
504
+ # QUERY 2: Fetch ALL PR statuses in one query
505
+ # =========================================================================
506
+ if all_pr_urls:
507
+ print(f"\nπŸ” Query 2/2: Fetching ALL PR statuses...")
508
+ extended_end_date = current_time
509
+ status_map = fetch_pr_status_from_bigquery(client, list(all_pr_urls), start_date, extended_end_date)
510
  else:
511
+ status_map = {}
 
 
 
 
 
 
 
512
 
513
+ # =========================================================================
514
+ # Post-process: Build metadata for each agent
515
+ # =========================================================================
516
+ print(f"\nπŸ“¦ Processing metadata for each agent...")
517
+ results = {}
518
 
519
+ for agent in agents:
520
+ identifier = agent.get('github_identifier')
521
+ if not identifier or identifier not in reviews_by_agent:
522
+ results[identifier] = []
523
+ continue
524
 
525
+ review_rows = reviews_by_agent[identifier]
 
526
 
527
+ # Deduplicate by PR URL
528
+ metadata_list = []
529
+ seen_prs = set()
530
+ for row in review_rows:
531
+ pr_url = row.pr_url
532
+ if pr_url in seen_prs:
533
+ continue
534
+ seen_prs.add(pr_url)
535
 
536
+ status_info = status_map.get(pr_url, {
537
+ 'status': 'open',
538
+ 'merged': False,
539
+ 'closed_at': None
540
+ })
541
 
542
+ metadata = extract_review_metadata(row, status_info)
543
+ metadata_list.append(metadata)
 
 
 
544
 
545
+ results[identifier] = metadata_list
546
+ print(f" βœ“ {agent.get('name', identifier)}: {len(metadata_list)} unique PRs")
547
 
548
+ return results
549
 
550
 
551
  # =============================================================================
 
574
  return dict(grouped)
575
 
576
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
577
  def save_review_metadata_to_hf(metadata_list, agent_identifier):
578
  """
579
  Save review metadata to HuggingFace dataset, organized by [agent_identifier]/YYYY.MM.DD.jsonl.
 
586
  metadata_list: List of review metadata dictionaries
587
  agent_identifier: GitHub identifier of the agent (used as folder name)
588
  """
 
589
  import shutil
590
 
591
  try:
 
663
 
664
 
665
  def load_agents_from_hf():
666
+ """
667
+ Load all agent metadata JSON files from HuggingFace dataset.
668
+
669
+ The github_identifier is extracted from the filename (e.g., 'agent-name[bot].json' -> 'agent-name[bot]')
670
+ """
671
  try:
672
  api = HfApi()
673
  agents = []
 
691
 
692
  with open(file_path, 'r') as f:
693
  agent_data = json.load(f)
694
+
695
+ # Extract github_identifier from filename (remove .json extension)
696
+ github_identifier = json_file.replace('.json', '')
697
+ agent_data['github_identifier'] = github_identifier
698
+
699
  agents.append(agent_data)
700
 
701
  except Exception as e:
 
717
  def mine_all_agents():
718
  """
719
  Mine review metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace.
720
+ Uses BigQuery to query GitHub Archive with batch processing (only 2 queries for all agents).
721
  """
 
 
 
722
  # Load agent metadata from HuggingFace
723
  agents = load_agents_from_hf()
724
  if not agents:
 
728
  print(f"\n{'='*80}")
729
  print(f"Starting review metadata mining for {len(agents)} agents")
730
  print(f"Time frame: Last {LEADERBOARD_TIME_FRAME_DAYS} days")
731
+ print(f"Data source: BigQuery + GitHub Archive (BATCH MODE)")
732
+ print(f"{'='*80}\n")
733
+
734
+ # Fetch ALL reviews for ALL agents in batch (only 2 BigQuery queries total!)
735
+ try:
736
+ all_metadata = fetch_all_reviews_metadata_batch(agents)
737
+ except Exception as e:
738
+ print(f"βœ— Error during batch fetch: {str(e)}")
739
+ import traceback
740
+ traceback.print_exc()
741
+ return
742
+
743
+ # Save results for each agent
744
+ print(f"\n{'='*80}")
745
+ print(f"πŸ’Ύ Saving results to HuggingFace...")
746
  print(f"{'='*80}\n")
747
 
 
748
  for agent in agents:
749
  identifier = agent.get('github_identifier')
750
+ agent_name = agent.get('name', agent.get('agent_name', 'Unknown'))
751
 
752
  if not identifier:
753
  print(f"Warning: Skipping agent without identifier: {agent}")
754
  continue
755
 
756
+ metadata = all_metadata.get(identifier, [])
 
 
 
 
 
 
757
 
758
+ try:
759
  if metadata:
760
+ print(f"πŸ’Ύ {agent_name}: Saving {len(metadata)} review records...")
761
  save_review_metadata_to_hf(metadata, identifier)
762
+ print(f" βœ“ Successfully saved")
763
  else:
764
  print(f" No reviews found for {agent_name}")
765
 
766
  except Exception as e:
767
+ print(f"βœ— Error saving {identifier}: {str(e)}")
768
  import traceback
769
  traceback.print_exc()
770
  continue
requirements.txt CHANGED
@@ -1,5 +1,7 @@
1
  APScheduler
2
  datasets
 
 
3
  gradio
4
  gradio_leaderboard
5
  huggingface_hub
 
1
  APScheduler
2
  datasets
3
+ db-dtypes
4
+ google-cloud-bigquery
5
  gradio
6
  gradio_leaderboard
7
  huggingface_hub