npb_data_app / stats.py
patrickramos's picture
Add team pitching leaderboard
d0e7981
raw
history blame
13.6 kB
import polars as pl
from data import data_df
from types import SimpleNamespace
from convert import verify_and_return_presult
valid_pitch = pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)
def filter_data_by_date_and_game_kind(data, start_date=None, end_date=None, game_kind=None):
if start_date is not None:
data = data.filter(pl.col('date') >= start_date)
if end_date is not None:
data = data.filter(pl.col('date') <= end_date)
if game_kind is not None:
data = data.filter(pl.col('coarse_game_kind') == game_kind)
return data
def compute_team_games(data):
data = (
data
.with_columns(
pl.col('gameId').unique().len().over('HomeTeamNameES').alias('home_games'),
pl.col('gameId').unique().len().over('VisitorTeamNameES').alias('visitor_games')
)
)
game_data = (
data
.group_by('HomeTeamNameES')
.first()
[['HomeTeamNameES', 'home_games']]
.rename({'HomeTeamNameES': 'team'})
.join(
(
data
.group_by('VisitorTeamNameES')
.first()
[['VisitorTeamNameES', 'visitor_games']]
.rename({'VisitorTeamNameES': 'team'})
),
on='team',
how='full'
)
.fill_null(0)
.with_columns(
(pl.col('home_games')+pl.col('visitor_games')).alias('games'),
pl.when(pl.col('team').is_null())
.then(pl.col('team_right'))
.otherwise(pl.col('team')).alias('team')
)
)
return (
data
.drop('home_games', 'visitor_games')
.join(
game_data[['team', 'games']].rename({'games': 'home_games'}),
left_on='HomeTeamNameES',
right_on='team'
)
.join(
game_data[['team', 'games']].rename({'games': 'visitor_games'}),
left_on='VisitorTeamNameES',
right_on='team'
)
)
def compute_pitch_stats(data, player_type, pitch_class_type, min_pitches=1, pitcher_lr='both', batter_lr='both', group_by_team=False):
assert pitcher_lr in ('both', 'l', 'r')
assert batter_lr in ('both', 'l', 'r')
assert player_type in ('pitcher', 'batter')
assert pitch_class_type in ('general', 'specific')
pitching = player_type in ('pitcher', )
if pitcher_lr != 'both':
data = data.filter(pl.col('pitLR') == pitcher_lr)
if batter_lr != 'both':
data = data.filter(pl.col('batLR') == batter_lr)
id_cols = ['pitId' if player_type == 'pitcher' else 'batId']
team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short'
if group_by_team:
id_cols.append(team_col)
name_col = 'pitcher_name' if player_type == 'pitcher' else 'batter_name'
pitch_col = 'ballKind_code' if pitch_class_type == 'specific' else 'general_ballKind_code'
pitch_name_col = 'ballKind' if pitch_class_type == 'specific' else 'general_ballKind'
pitch_stats = (
data
.with_columns((pl.col('ballSpeed') / 1.609).round(1).alias('mph'))
.group_by(*id_cols, pitch_col)
.agg(
pl.first(name_col),
pl.col('pitLR').first().str.to_uppercase().alias('Throws'),
*([pl.first('general_ballKind')] if pitch_class_type == 'specific' else []),
pl.first(pitch_name_col),
pl.len().alias('count'),
pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('ballSpeed').mean().alias('Avg KPH'),
pl.col('ballSpeed').max().alias('Max KPH'),
pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('mph').mean().round(1).alias('Avg MPH'),
pl.col('mph').max().alias('Max MPH'),
pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True),
(pl.col('swing').sum() / pl.col('pitch').sum()).alias('Swing%'),
((pl.col('swing') & pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Z-Swing%'),
((pl.col('swing') & ~pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Chase%'),
((pl.col('swing') & ~pl.col('whiff')).sum()/pl.col('swing').sum()).alias('Contact%'),
((pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(pl.col('zone') & pl.col('swing')).sum()).alias('Z-Contact%'),
((~pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(~pl.col('zone') & pl.col('swing')).sum()).alias('O-Contact%'),
(pl.col('whiff').sum() / pl.col('swing').sum()).alias('Whiff%'),
(pl.col('whiff').sum() / pl.col('pitch').sum()).alias('SwStr%'),
(pl.col('csw').sum() / pl.col('pitch').sum()).alias('CSW%'),
(pl.col('zone').sum() / pl.col('pitch').sum()).alias('Zone%'),
(pl.when(pl.col('pitLR') == 'r').then(pl.col('x') < 0).otherwise(pl.col('x') > 0)).mean().alias('Glove%'),
(pl.when(pl.col('pitLR') == 'r').then(pl.col('x') >= 0).otherwise(pl.col('x') <= 0)).mean().alias('Arm%'),
(pl.col('y') > 125).mean().alias('High%'),
(pl.col('y') <= 125).mean().alias('Low%'),
(pl.col('x').is_between(-20, 20) & pl.col('y').is_between(100, 100+50)).mean().alias('MM%')
)
.with_columns(
(pl.col('count')/pl.sum('count').over('pitId')).alias('usage'),
(pl.col('count') >= min_pitches).alias('qualified'),
)
.explode('batType')
.unnest('batType')
.pivot(on='batType', values='proportion')
.fill_null(0)
.with_columns(
(pl.col('G') + pl.col('B')).alias('GB%'),
(pl.col('F') + pl.col('P')).alias('FB%'),
pl.col('L').alias('LD%').round(2),
)
.drop('G', 'F', 'B', 'P', 'L', 'null')
.with_columns(
(pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=((stat in ['FB%', 'LD%'] or 'Contact%' in stat)))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl')
for stat in ['Avg KPH', 'Max KPH', 'Avg MPH', 'Max MPH', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'GB%', 'FB%', 'LD%', 'Zone%']
)
.rename({pitch_col: 'ballKind_code', pitch_name_col: 'ballKind'} if pitch_class_type == 'general' else {})
.sort(id_cols[0], 'count', descending=[False, True])
)
return pitch_stats
def compute_player_stats(data, player_type, qual='qualified', pitcher_lr='both', batter_lr='both', group_by_team=False):
assert pitcher_lr in ('both', 'l', 'r')
assert batter_lr in ('both', 'l', 'r')
assert player_type in ('pitcher', 'batter', 'team pitching', 'team batting')
# pitching or batting, player or team
pitching = player_type in ('pitcher', 'team pitching')
team = player_type in ('team pitching', 'team batting')
# handedness filters
if pitcher_lr != 'both':
data = data.filter(pl.col('pitLR') == pitcher_lr)
if batter_lr != 'both':
data = data.filter(pl.col('batLR') == batter_lr)
if pitching:
over_col = 'pitId' if not team else 'pitcher_team_name_short'
else:
over_col = 'batId' if not team else 'batter_team_name_short'
data = (
compute_team_games(data)
.with_columns(
pl.when(pl.col('half_inning').str.ends_with('1')).then('home_games').otherwise('visitor_games').first().over('pitId').alias('games'),
# pl.col('inning_code').unique().len().over(over_col).alias('IP'),
(pl.col('bso').struct.field('o').cast(pl.Int32) - pl.col('beforeBso').struct.field('o').cast(pl.Int32)).sum().mul(1/3).over(over_col).alias('IP'),
pl.col('pa_code').unique().len().over(over_col).alias('PA'),
# pl.col('presult').is_in(verify_and_return_presult([
# 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)',
# 'Foul fly', 'Foul line (?)',
# 'Sacrifice bunt', 'Sacrifice fly',
# "Fielder's choice", "Sacrifice fielder's choice",
# 'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout'
# ])).sum().over('pitId').mul(1/3).alias('IP')
)
)
# qualifiers
qualified_factor = 1 if pitching else 3.1
qual_col = 'IP' if pitching else 'PA'
if qual == 'qualified':
data = data.with_columns((pl.col(qual_col) >= qualified_factor * pl.col('games')).alias('qualified'))
else:
data = data.with_columns((pl.col(qual_col) >= qual).alias('qualified'))
# percentile ascending/descending
if pitching:
stat_descending_pctl = lambda stat: stat in ['BB%', 'FB%', 'LD%', 'Z-Swing%'] or 'Contact%' in stat
else:
stat_descending_pctl = lambda stat: not (stat in ['BB%', 'FB%', 'LD%', 'Swing%', 'Z-Swing%'] or 'Contact%' in stat)
# col names
match player_type:
case 'pitcher':
id_cols = ['pitId']
name_col = 'pitcher_name'
case 'batter':
id_cols = ['batId']
name_col = 'batter_name'
case _:
id_cols = []
name_col = None
team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short'
if group_by_team or team:
id_cols.append(team_col)
handedness_col = 'pitLR' if pitching else 'batLR'
new_handedness_col = 'Throws' if pitching else 'Bats'
player_stats = (
data
.with_columns(pl.when(pl.col('general_ballKind_code').is_in(['4S', 'FC', 'SI'])).then(pl.when(valid_pitch).then('ballSpeed').mean().over('pitId', 'general_ballKind_code')).mul(1/1.609).round(1).alias('FB Velo'))
.group_by(id_cols)
.agg(
*([pl.col(name_col).first()] if not team else []),
*([] if group_by_team or team else [pl.col(team_col).last()]),
*(
[pl.col(handedness_col).first().str.to_uppercase().alias(new_handedness_col) ]
if not (team and ((pitcher_lr == 'both') if pitching else (batter_lr == 'both')))
else []
),
pl.col('IP').first(),
pl.col('PA').first(),
pl.col('FB Velo').max(),
(pl.when(pl.col('presult').str.contains('strikeout')).then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('K%'),
(pl.when(pl.col('presult') == 'Walk').then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('BB%'),
pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True),
(pl.col('swing').sum() / pl.col('pitch').sum()).alias('Swing%'),
((pl.col('swing') & pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Z-Swing%'),
((pl.col('swing') & ~pl.col('zone')).sum() / pl.col('pitch').sum()).alias('Chase%'),
((pl.col('swing') & ~pl.col('whiff')).sum()/pl.col('swing').sum()).alias('Contact%'),
((pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(pl.col('zone') & pl.col('swing')).sum()).alias('Z-Contact%'),
((~pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(~pl.col('zone') & pl.col('swing')).sum()).alias('O-Contact%'),
(pl.col('whiff').sum() / pl.col('swing').sum()).alias('Whiff%'),
(pl.col('whiff').sum() / pl.col('pitch').sum()).alias('SwStr%'),
(pl.col('csw').sum() / pl.col('pitch').sum()).alias('CSW%'),
(pl.col('zone').sum() / pl.col('pitch').sum()).alias('Zone%'),
(pl.when(pl.col('pitLR') == 'r').then(pl.col('x') < 0).otherwise(pl.col('x') > 0)).mean().alias('Glove%'),
(pl.when(pl.col('pitLR') == 'r').then(pl.col('x') >= 0).otherwise(pl.col('x') <= 0)).mean().alias('Arm%'),
(pl.col('y') > 125).mean().alias('High%'),
(pl.col('y') <= 125).mean().alias('Low%'),
(pl.col('x').is_between(-20, 20) & pl.col('y').is_between(100, 100+50)).mean().alias('MM%'),
pl.first('qualified')
)
.explode('batType')
.unnest('batType')
.pivot(on='batType', values='proportion')
.fill_null(0)
.with_columns(
(pl.col('G') + pl.col('B')).alias('GB%'),
(pl.col('F') + pl.col('P')).alias('FB%'),
pl.col('L').alias('LD%'),
)
.drop('G', 'F', 'B', 'P', 'L')
.with_columns(
(pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=stat_descending_pctl(stat))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl')
for stat in ['FB Velo', 'K%', 'BB%', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'GB%', 'FB%', 'LD%', 'Zone%']
)
.sort(qual_col, descending=True)
)
return player_stats
def get_pitcher_stats(id, lr='both', game_kind=None, start_date=None, end_date=None, min_ip=1, min_pitches=1, pitch_class_type='specific'):
source_data = data_df
source_data = filter_data_by_date_and_game_kind(source_data, start_date=start_date, end_date=end_date, game_kind=game_kind)
# if lr is not None:
# source_data =
pitch_stats = compute_pitch_stats(source_data, player_type='pitcher', pitch_class_type=pitch_class_type, min_pitches=min_pitches, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id)
pitch_shapes = (
(source_data.filter(pl.col('batLR') == lr) if lr != 'both' else source_data)
.filter(
(pl.col('pitId') == id) &
pl.col('x').is_not_null() &
pl.col('y').is_not_null() &
(pl.col('ballSpeed') > 0)
)
[['pitId', 'general_ballKind_code', 'ballKind_code', 'ballSpeed', 'x', 'y']]
.with_columns((pl.col('ballSpeed')/1.609).alias('ballSpeed_mph'))
)
pitcher_stats = compute_player_stats(source_data, player_type='pitcher', qual=min_ip, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id)
return SimpleNamespace(pitcher_stats=pitcher_stats, pitch_stats=pitch_stats, pitch_shapes=pitch_shapes)