Spaces:
Running
Running
| import polars as pl | |
| from data import data_df | |
| from types import SimpleNamespace | |
| from convert import verify_and_return_presult | |
| valid_pitch = pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0) | |
| def filter_data_by_date_and_game_kind(data, start_date=None, end_date=None, game_kind=None): | |
| if start_date is not None: | |
| data = data.filter(pl.col('date') >= start_date) | |
| if end_date is not None: | |
| data = data.filter(pl.col('date') <= end_date) | |
| if game_kind is not None: | |
| data = data.filter(pl.col('coarse_game_kind') == game_kind) | |
| return data | |
| def compute_team_games(data): | |
| data = ( | |
| data | |
| .with_columns( | |
| pl.col('gameId').unique().len().over('HomeTeamNameES').alias('home_games'), | |
| pl.col('gameId').unique().len().over('VisitorTeamNameES').alias('visitor_games') | |
| ) | |
| ) | |
| game_data = ( | |
| data | |
| .group_by('HomeTeamNameES') | |
| .first() | |
| [['HomeTeamNameES', 'home_games']] | |
| .rename({'HomeTeamNameES': 'team'}) | |
| .join( | |
| ( | |
| data | |
| .group_by('VisitorTeamNameES') | |
| .first() | |
| [['VisitorTeamNameES', 'visitor_games']] | |
| .rename({'VisitorTeamNameES': 'team'}) | |
| ), | |
| on='team', | |
| how='full' | |
| ) | |
| .fill_null(0) | |
| .with_columns( | |
| (pl.col('home_games')+pl.col('visitor_games')).alias('games'), | |
| pl.when(pl.col('team').is_null()) | |
| .then(pl.col('team_right')) | |
| .otherwise(pl.col('team')).alias('team') | |
| ) | |
| ) | |
| return ( | |
| data | |
| .drop('home_games', 'visitor_games') | |
| .join( | |
| game_data[['team', 'games']].rename({'games': 'home_games'}), | |
| left_on='HomeTeamNameES', | |
| right_on='team' | |
| ) | |
| .join( | |
| game_data[['team', 'games']].rename({'games': 'visitor_games'}), | |
| left_on='VisitorTeamNameES', | |
| right_on='team' | |
| ) | |
| ) | |
| def compute_pitch_stats(data, player_type, pitch_class_type, min_pitches=1, pitcher_lr='both', batter_lr='both', group_by_team=False): | |
| assert pitcher_lr in ('both', 'l', 'r') | |
| assert batter_lr in ('both', 'l', 'r') | |
| assert player_type in ('pitcher', 'batter') | |
| assert pitch_class_type in ('general', 'specific') | |
| pitching = player_type in ('pitcher', ) | |
| if pitcher_lr != 'both': | |
| data = data.filter(pl.col('pitLR') == pitcher_lr) | |
| if batter_lr != 'both': | |
| data = data.filter(pl.col('batLR') == batter_lr) | |
| id_cols = ['pitId' if player_type == 'pitcher' else 'batId'] | |
| team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short' | |
| if group_by_team: | |
| id_cols.append(team_col) | |
| name_col = 'pitcher_name' if player_type == 'pitcher' else 'batter_name' | |
| pitch_col = 'ballKind_code' if pitch_class_type == 'specific' else 'general_ballKind_code' | |
| pitch_name_col = 'ballKind' if pitch_class_type == 'specific' else 'general_ballKind' | |
| pitch_stats = ( | |
| data | |
| .with_columns((pl.col('ballSpeed') / 1.609).round(1).alias('mph')) | |
| .group_by(*id_cols, pitch_col) | |
| .agg( | |
| pl.first(name_col), | |
| pl.col('pitLR').first().str.to_uppercase().alias('Throws'), | |
| *([pl.first('general_ballKind')] if pitch_class_type == 'specific' else []), | |
| pl.first(pitch_name_col), | |
| pl.len().alias('count'), | |
| pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('ballSpeed').mean().alias('Avg KPH'), | |
| pl.col('ballSpeed').max().alias('Max KPH'), | |
| pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('mph').mean().round(1).alias('Avg MPH'), | |
| pl.col('mph').max().alias('Max MPH'), | |
| pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True), | |
| (pl.col('swing').sum() / pl.col('pitch').sum()).alias('Swing%'), | |
| ((pl.col('swing') & pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Z-Swing%'), | |
| ((pl.col('swing') & ~pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Chase%'), | |
| ((pl.col('swing') & ~pl.col('whiff')).sum()/pl.col('swing').sum()).alias('Contact%'), | |
| ((pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(pl.col('zone') & pl.col('swing')).sum()).alias('Z-Contact%'), | |
| ((~pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(~pl.col('zone') & pl.col('swing')).sum()).alias('O-Contact%'), | |
| (pl.col('whiff').sum() / pl.col('swing').sum()).alias('Whiff%'), | |
| (pl.col('whiff').sum() / pl.col('pitch').sum()).alias('SwStr%'), | |
| (pl.col('csw').sum() / pl.col('pitch').sum()).alias('CSW%'), | |
| (pl.col('zone').sum() / pl.col('pitch').sum()).alias('Zone%'), | |
| (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') < 0).otherwise(pl.col('x') > 0)).mean().alias('Glove%'), | |
| (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') >= 0).otherwise(pl.col('x') <= 0)).mean().alias('Arm%'), | |
| (pl.col('y') > 125).mean().alias('High%'), | |
| (pl.col('y') <= 125).mean().alias('Low%'), | |
| (pl.col('x').is_between(-20, 20) & pl.col('y').is_between(100, 100+50)).mean().alias('MM%') | |
| ) | |
| .with_columns( | |
| (pl.col('count')/pl.sum('count').over('pitId')).alias('usage'), | |
| (pl.col('count') >= min_pitches).alias('qualified'), | |
| ) | |
| .explode('batType') | |
| .unnest('batType') | |
| .pivot(on='batType', values='proportion') | |
| .fill_null(0) | |
| .with_columns( | |
| (pl.col('G') + pl.col('B')).alias('GB%'), | |
| (pl.col('F') + pl.col('P')).alias('FB%'), | |
| pl.col('L').alias('LD%').round(2), | |
| ) | |
| .drop('G', 'F', 'B', 'P', 'L', 'null') | |
| .with_columns( | |
| (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=((stat in ['FB%', 'LD%'] or 'Contact%' in stat)))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') | |
| for stat in ['Avg KPH', 'Max KPH', 'Avg MPH', 'Max MPH', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'GB%', 'FB%', 'LD%', 'Zone%'] | |
| ) | |
| .rename({pitch_col: 'ballKind_code', pitch_name_col: 'ballKind'} if pitch_class_type == 'general' else {}) | |
| .sort(id_cols[0], 'count', descending=[False, True]) | |
| ) | |
| return pitch_stats | |
| def compute_player_stats(data, player_type, qual='qualified', pitcher_lr='both', batter_lr='both', group_by_team=False): | |
| assert pitcher_lr in ('both', 'l', 'r') | |
| assert batter_lr in ('both', 'l', 'r') | |
| assert player_type in ('pitcher', 'batter', 'team pitching', 'team batting') | |
| # pitching or batting, player or team | |
| pitching = player_type in ('pitcher', 'team pitching') | |
| team = player_type in ('team pitching', 'team batting') | |
| # handedness filters | |
| if pitcher_lr != 'both': | |
| data = data.filter(pl.col('pitLR') == pitcher_lr) | |
| if batter_lr != 'both': | |
| data = data.filter(pl.col('batLR') == batter_lr) | |
| if pitching: | |
| over_col = 'pitId' if not team else 'pitcher_team_name_short' | |
| else: | |
| over_col = 'batId' if not team else 'batter_team_name_short' | |
| data = ( | |
| compute_team_games(data) | |
| .with_columns( | |
| pl.when(pl.col('half_inning').str.ends_with('1')).then('home_games').otherwise('visitor_games').first().over('pitId').alias('games'), | |
| # pl.col('inning_code').unique().len().over(over_col).alias('IP'), | |
| (pl.col('bso').struct.field('o').cast(pl.Int32) - pl.col('beforeBso').struct.field('o').cast(pl.Int32)).sum().mul(1/3).over(over_col).alias('IP'), | |
| pl.col('pa_code').unique().len().over(over_col).alias('PA'), | |
| # pl.col('presult').is_in(verify_and_return_presult([ | |
| # 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)', | |
| # 'Foul fly', 'Foul line (?)', | |
| # 'Sacrifice bunt', 'Sacrifice fly', | |
| # "Fielder's choice", "Sacrifice fielder's choice", | |
| # 'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout' | |
| # ])).sum().over('pitId').mul(1/3).alias('IP') | |
| ) | |
| ) | |
| # qualifiers | |
| qualified_factor = 1 if pitching else 3.1 | |
| qual_col = 'IP' if pitching else 'PA' | |
| if qual == 'qualified': | |
| data = data.with_columns((pl.col(qual_col) >= qualified_factor * pl.col('games')).alias('qualified')) | |
| else: | |
| data = data.with_columns((pl.col(qual_col) >= qual).alias('qualified')) | |
| # percentile ascending/descending | |
| if pitching: | |
| stat_descending_pctl = lambda stat: stat in ['BB%', 'FB%', 'LD%', 'Z-Swing%'] or 'Contact%' in stat | |
| else: | |
| stat_descending_pctl = lambda stat: not (stat in ['BB%', 'FB%', 'LD%', 'Swing%', 'Z-Swing%'] or 'Contact%' in stat) | |
| # col names | |
| match player_type: | |
| case 'pitcher': | |
| id_cols = ['pitId'] | |
| name_col = 'pitcher_name' | |
| case 'batter': | |
| id_cols = ['batId'] | |
| name_col = 'batter_name' | |
| case _: | |
| id_cols = [] | |
| name_col = None | |
| team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short' | |
| if group_by_team or team: | |
| id_cols.append(team_col) | |
| handedness_col = 'pitLR' if pitching else 'batLR' | |
| new_handedness_col = 'Throws' if pitching else 'Bats' | |
| player_stats = ( | |
| data | |
| .with_columns(pl.when(pl.col('general_ballKind_code').is_in(['4S', 'FC', 'SI'])).then(pl.when(valid_pitch).then('ballSpeed').mean().over('pitId', 'general_ballKind_code')).mul(1/1.609).round(1).alias('FB Velo')) | |
| .group_by(id_cols) | |
| .agg( | |
| *([pl.col(name_col).first()] if not team else []), | |
| *([] if group_by_team or team else [pl.col(team_col).last()]), | |
| *( | |
| [pl.col(handedness_col).first().str.to_uppercase().alias(new_handedness_col) ] | |
| if not (team and ((pitcher_lr == 'both') if pitching else (batter_lr == 'both'))) | |
| else [] | |
| ), | |
| pl.col('IP').first(), | |
| pl.col('PA').first(), | |
| pl.col('FB Velo').max(), | |
| (pl.when(pl.col('presult').str.contains('strikeout')).then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('K%'), | |
| (pl.when(pl.col('presult') == 'Walk').then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('BB%'), | |
| pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True), | |
| (pl.col('swing').sum() / pl.col('pitch').sum()).alias('Swing%'), | |
| ((pl.col('swing') & pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Z-Swing%'), | |
| ((pl.col('swing') & ~pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Chase%'), | |
| ((pl.col('swing') & ~pl.col('whiff')).sum()/pl.col('swing').sum()).alias('Contact%'), | |
| ((pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(pl.col('zone') & pl.col('swing')).sum()).alias('Z-Contact%'), | |
| ((~pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(~pl.col('zone') & pl.col('swing')).sum()).alias('O-Contact%'), | |
| (pl.col('whiff').sum() / pl.col('swing').sum()).alias('Whiff%'), | |
| (pl.col('whiff').sum() / pl.col('pitch').sum()).alias('SwStr%'), | |
| (pl.col('csw').sum() / pl.col('pitch').sum()).alias('CSW%'), | |
| (pl.col('zone').sum() / pl.col('pitch').sum()).alias('Zone%'), | |
| (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') < 0).otherwise(pl.col('x') > 0)).mean().alias('Glove%'), | |
| (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') >= 0).otherwise(pl.col('x') <= 0)).mean().alias('Arm%'), | |
| (pl.col('y') > 125).mean().alias('High%'), | |
| (pl.col('y') <= 125).mean().alias('Low%'), | |
| (pl.col('x').is_between(-20, 20) & pl.col('y').is_between(100, 100+50)).mean().alias('MM%'), | |
| pl.first('qualified') | |
| ) | |
| .explode('batType') | |
| .unnest('batType') | |
| .pivot(on='batType', values='proportion') | |
| .fill_null(0) | |
| .with_columns( | |
| (pl.col('G') + pl.col('B')).alias('GB%'), | |
| (pl.col('F') + pl.col('P')).alias('FB%'), | |
| pl.col('L').alias('LD%'), | |
| ) | |
| .drop('G', 'F', 'B', 'P', 'L') | |
| .with_columns( | |
| (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=stat_descending_pctl(stat))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') | |
| for stat in ['FB Velo', 'K%', 'BB%', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'GB%', 'FB%', 'LD%', 'Zone%'] | |
| ) | |
| .sort(qual_col, descending=True) | |
| ) | |
| return player_stats | |
| def get_pitcher_stats(id, lr='both', game_kind=None, start_date=None, end_date=None, min_ip=1, min_pitches=1, pitch_class_type='specific'): | |
| source_data = data_df | |
| source_data = filter_data_by_date_and_game_kind(source_data, start_date=start_date, end_date=end_date, game_kind=game_kind) | |
| # if lr is not None: | |
| # source_data = | |
| pitch_stats = compute_pitch_stats(source_data, player_type='pitcher', pitch_class_type=pitch_class_type, min_pitches=min_pitches, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id) | |
| pitch_shapes = ( | |
| (source_data.filter(pl.col('batLR') == lr) if lr != 'both' else source_data) | |
| .filter( | |
| (pl.col('pitId') == id) & | |
| pl.col('x').is_not_null() & | |
| pl.col('y').is_not_null() & | |
| (pl.col('ballSpeed') > 0) | |
| ) | |
| [['pitId', 'general_ballKind_code', 'ballKind_code', 'ballSpeed', 'x', 'y']] | |
| .with_columns((pl.col('ballSpeed')/1.609).alias('ballSpeed_mph')) | |
| ) | |
| pitcher_stats = compute_player_stats(source_data, player_type='pitcher', qual=min_ip, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id) | |
| return SimpleNamespace(pitcher_stats=pitcher_stats, pitch_stats=pitch_stats, pitch_shapes=pitch_shapes) | |