Implement NHL API integration with nhlpy library

Added complete implementation of NHL API data adapters:

Player Adapter:
- get_player_by_id: Retrieves player info from career stats
- get_players_by_team: Fetches full team roster (forwards, defensemen, goalies)
- get_skater_stats: Aggregates current season skater statistics from game logs
- get_goalie_stats: Aggregates current season goalie statistics from game logs
- Data transformation utilities for roster and player data

Team Adapter:
- get_all_teams: Retrieves all NHL teams with division/conference info
- get_team_by_id: Looks up team by ID or abbreviation
- get_teams_by_division: Filters teams by division
- get_teams_by_conference: Filters teams by conference
- Data transformation for team entities

Technical Details:
- Corrected package name from nhl-api-py to nhlpy in requirements
- Implemented proper error handling with logging
- Dynamic season calculation based on current date
- Stats aggregation from game log data for accurate totals
- Proper type transformations between API responses and domain entities

Note: Player search functionality marked as not implemented due to API limitations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Michael Simard
2025-11-23 17:22:21 -06:00
parent 337a6377de
commit 6d8d51f698
2 changed files with 378 additions and 32 deletions

View File

@@ -10,7 +10,7 @@ asyncpg==0.30.0
alembic==1.14.0 alembic==1.14.0
# External APIs # External APIs
nhl-api-py==1.1.0 nhlpy==1.0.3
yfpy==14.1.1 yfpy==14.1.1
# Utilities # Utilities

View File

@@ -1,12 +1,15 @@
"""NHL API adapter implementation.""" """NHL API adapter implementation."""
from typing import List, Optional from typing import List, Optional, Any, Dict
from datetime import datetime from datetime import datetime
import logging
from nhl_api import NHLClient from nhlpy import NHLClient
from src.domain.entities import Player, SkaterStats, GoalieStats, NHLTeam from src.domain.entities import Player, SkaterStats, GoalieStats, NHLTeam
from src.domain.repositories import PlayerRepository, TeamRepository from src.domain.repositories import PlayerRepository, TeamRepository
logger = logging.getLogger(__name__)
class NHLPlayerAdapter(PlayerRepository): class NHLPlayerAdapter(PlayerRepository):
"""Adapter for NHL API player data access.""" """Adapter for NHL API player data access."""
@@ -18,21 +21,63 @@ class NHLPlayerAdapter(PlayerRepository):
async def get_player_by_id(self, player_id: str) -> Optional[Player]: async def get_player_by_id(self, player_id: str) -> Optional[Player]:
"""Retrieves a player by their unique identifier.""" """Retrieves a player by their unique identifier."""
try: try:
# TODO: Implement actual NHL API calls # Note: nhlpy does not have direct player lookup by ID
# This is a placeholder for the actual implementation # We need to get player info from team rosters or stats
# The nhl-api-py library will be used here # For now, we will attempt to get it from career stats
pass career_stats = self.client.stats.player_career_stats(player_id=player_id)
if not career_stats or not career_stats.get("seasonTotals"):
return None
# Extract player info from career stats response
player_info = career_stats.get("playerStatsNow", {})
return self._transform_player_data(player_id, player_info)
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching player {player_id}: {e}")
return None return None
async def get_players_by_team(self, team_id: str) -> List[Player]: async def get_players_by_team(self, team_id: str) -> List[Player]:
"""Retrieves all players for a specific team.""" """Retrieves all players for a specific team."""
try: try:
# TODO: Implement actual NHL API calls # Get current season (format: "20242025")
pass current_year = datetime.now().year
current_month = datetime.now().month
if current_month < 9: # Before September, use previous season
season = f"{current_year - 1}{current_year}"
else:
season = f"{current_year}{current_year + 1}"
# Get team roster using team abbreviation
roster_data = self.client.teams.team_roster(
team_abbr=team_id, season=season
)
players = []
# Process forwards
for forward in roster_data.get("forwards", []):
player = self._transform_roster_player(forward, "F", team_id)
if player:
players.append(player)
# Process defensemen
for defenseman in roster_data.get("defensemen", []):
player = self._transform_roster_player(defenseman, "D", team_id)
if player:
players.append(player)
# Process goalies
for goalie in roster_data.get("goalies", []):
player = self._transform_roster_player(goalie, "G", team_id)
if player:
players.append(player)
return players
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching players for team {team_id}: {e}")
return [] return []
async def search_players( async def search_players(
@@ -40,28 +85,286 @@ class NHLPlayerAdapter(PlayerRepository):
) -> List[Player]: ) -> List[Player]:
"""Searches for players by name and optionally by position.""" """Searches for players by name and optionally by position."""
try: try:
# TODO: Implement actual NHL API calls # Note: The NHL API does not have a direct search endpoint
pass # This would require fetching all players or maintaining a local database
# For now, we will return an empty list and mark as not implemented
logger.warning("Player search not implemented - NHL API lacks search endpoint")
return []
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error searching players: {e}")
return [] return []
async def get_skater_stats(self, player_id: str) -> Optional[SkaterStats]: async def get_skater_stats(self, player_id: str) -> Optional[SkaterStats]:
"""Retrieves current season statistics for a skater.""" """Retrieves current season statistics for a skater."""
try: try:
# TODO: Implement actual NHL API calls # Get current season
pass current_year = datetime.now().year
current_month = datetime.now().month
if current_month < 9:
season_id = f"{current_year - 1}{current_year}"
else:
season_id = f"{current_year}{current_year + 1}"
# Get game log for current season (game_type=2 is regular season)
game_log = self.client.stats.player_game_log(
player_id=player_id, season_id=season_id, game_type=2
)
if not game_log or "gameLog" not in game_log:
return None
# Aggregate stats from game log
return self._aggregate_skater_stats(player_id, game_log["gameLog"])
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching skater stats for player {player_id}: {e}")
return None return None
async def get_goalie_stats(self, player_id: str) -> Optional[GoalieStats]: async def get_goalie_stats(self, player_id: str) -> Optional[GoalieStats]:
"""Retrieves current season statistics for a goalie.""" """Retrieves current season statistics for a goalie."""
try: try:
# TODO: Implement actual NHL API calls # Get current season
pass current_year = datetime.now().year
current_month = datetime.now().month
if current_month < 9:
season_id = f"{current_year - 1}{current_year}"
else:
season_id = f"{current_year}{current_year + 1}"
# Get game log for current season (game_type=2 is regular season)
game_log = self.client.stats.player_game_log(
player_id=player_id, season_id=season_id, game_type=2
)
if not game_log or "gameLog" not in game_log:
return None
# Aggregate stats from game log
return self._aggregate_goalie_stats(player_id, game_log["gameLog"])
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching goalie stats for player {player_id}: {e}")
return None
def _transform_roster_player(
self, player_data: Dict[str, Any], position: str, team_id: str
) -> Optional[Player]:
"""Transforms NHL API roster player data to domain Player entity."""
try:
first_name = player_data.get("firstName", {}).get("default", "")
last_name = player_data.get("lastName", {}).get("default", "")
# Parse birth date if available
birth_date = None
if "birthDate" in player_data:
try:
birth_date = datetime.strptime(
player_data["birthDate"], "%Y-%m-%d"
)
except ValueError:
pass
return Player(
id=str(player_data.get("id", "")),
first_name=first_name,
last_name=last_name,
jersey_number=player_data.get("sweaterNumber"),
position=position,
team_id=team_id,
is_active=True,
birth_date=birth_date,
height_inches=player_data.get("heightInInches"),
weight_pounds=player_data.get("weightInPounds"),
)
except Exception as e:
logger.error(f"Error transforming roster player data: {e}")
return None
def _transform_player_data(
self, player_id: str, player_data: Dict[str, Any]
) -> Optional[Player]:
"""Transforms NHL API player data to domain Player entity."""
try:
# This is a simplified transformation from career stats data
# In a real implementation, we would need more complete player info
return Player(
id=player_id,
first_name=player_data.get("firstName", ""),
last_name=player_data.get("lastName", ""),
jersey_number=None,
position=player_data.get("position", ""),
team_id=str(player_data.get("teamId", "")),
is_active=True,
birth_date=None,
height_inches=None,
weight_pounds=None,
)
except Exception as e:
logger.error(f"Error transforming player data: {e}")
return None
def _aggregate_skater_stats(
self, player_id: str, game_log: List[Dict[str, Any]]
) -> Optional[SkaterStats]:
"""Aggregates skater statistics from game log data."""
try:
if not game_log:
return None
# Initialize totals
games_played = len(game_log)
goals = 0
assists = 0
points = 0
plus_minus = 0
penalty_minutes = 0
shots = 0
powerplay_goals = 0
powerplay_points = 0
shorthanded_goals = 0
game_winning_goals = 0
hits = 0
blocked_shots = 0
total_toi = 0.0
total_faceoffs_won = 0
total_faceoffs = 0
# Aggregate from game log
for game in game_log:
goals += game.get("goals", 0)
assists += game.get("assists", 0)
points += game.get("points", 0)
plus_minus += game.get("plusMinus", 0)
penalty_minutes += game.get("pim", 0)
shots += game.get("shots", 0)
powerplay_goals += game.get("powerPlayGoals", 0)
powerplay_points += game.get("powerPlayPoints", 0)
shorthanded_goals += game.get("shorthandedGoals", 0)
game_winning_goals += game.get("gameWinningGoals", 0)
hits += game.get("hits", 0)
blocked_shots += game.get("blockedShots", 0)
# Parse time on ice (format: "MM:SS")
toi_str = game.get("toi", "0:00")
if toi_str and ":" in toi_str:
parts = toi_str.split(":")
total_toi += int(parts[0]) + int(parts[1]) / 60.0
# Faceoffs
total_faceoffs_won += game.get("faceoffWinningPctg", 0) * game.get(
"faceoffs", 0
)
total_faceoffs += game.get("faceoffs", 0)
# Calculate averages and percentages
shooting_percentage = (goals / shots * 100.0) if shots > 0 else 0.0
time_on_ice_per_game = total_toi / games_played if games_played > 0 else 0.0
faceoff_percentage = (
(total_faceoffs_won / total_faceoffs * 100.0)
if total_faceoffs > 0
else None
)
return SkaterStats(
player_id=player_id,
games_played=games_played,
goals=goals,
assists=assists,
points=points,
plus_minus=plus_minus,
penalty_minutes=penalty_minutes,
shots=shots,
shooting_percentage=shooting_percentage,
time_on_ice_per_game=time_on_ice_per_game,
powerplay_goals=powerplay_goals,
powerplay_points=powerplay_points,
shorthanded_goals=shorthanded_goals,
game_winning_goals=game_winning_goals,
faceoff_percentage=faceoff_percentage,
hits=hits,
blocked_shots=blocked_shots,
updated_at=datetime.now(),
)
except Exception as e:
logger.error(f"Error aggregating skater stats: {e}")
return None
def _aggregate_goalie_stats(
self, player_id: str, game_log: List[Dict[str, Any]]
) -> Optional[GoalieStats]:
"""Aggregates goalie statistics from game log data."""
try:
if not game_log:
return None
# Initialize totals
games_played = 0
games_started = 0
wins = 0
losses = 0
overtime_losses = 0
saves = 0
shots_against = 0
goals_against = 0
shutouts = 0
total_toi = 0.0
# Aggregate from game log
for game in game_log:
# Only count games where goalie actually played
if game.get("toi", "0:00") != "0:00":
games_played += 1
if game.get("started", 0) == 1:
games_started += 1
decision = game.get("decision", "")
if decision == "W":
wins += 1
elif decision == "L":
losses += 1
elif decision == "O":
overtime_losses += 1
saves += game.get("saves", 0)
shots_against += game.get("shotsAgainst", 0)
goals_against += game.get("goalsAgainst", 0)
if game.get("shutouts", 0) > 0:
shutouts += 1
# Parse time on ice
toi_str = game.get("toi", "0:00")
if toi_str and ":" in toi_str:
parts = toi_str.split(":")
total_toi += int(parts[0]) + int(parts[1]) / 60.0
# Calculate percentages and averages
save_percentage = (
(saves / shots_against * 100.0) if shots_against > 0 else 0.0
)
goals_against_average = (
(goals_against / (total_toi / 60.0)) if total_toi > 0 else 0.0
)
return GoalieStats(
player_id=player_id,
games_played=games_played,
games_started=games_started,
wins=wins,
losses=losses,
overtime_losses=overtime_losses,
saves=saves,
shots_against=shots_against,
goals_against=goals_against,
save_percentage=save_percentage,
goals_against_average=goals_against_average,
shutouts=shutouts,
time_on_ice=total_toi,
updated_at=datetime.now(),
)
except Exception as e:
logger.error(f"Error aggregating goalie stats: {e}")
return None return None
@@ -75,35 +378,78 @@ class NHLTeamAdapter(TeamRepository):
async def get_team_by_id(self, team_id: str) -> Optional[NHLTeam]: async def get_team_by_id(self, team_id: str) -> Optional[NHLTeam]:
"""Retrieves a team by its unique identifier.""" """Retrieves a team by its unique identifier."""
try: try:
# TODO: Implement actual NHL API calls # Get all teams and filter by ID (or abbreviation)
pass all_teams = await self.get_all_teams()
for team in all_teams:
if team.id == team_id or team.abbreviation == team_id:
return team
return None
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching team {team_id}: {e}")
return None return None
async def get_all_teams(self) -> List[NHLTeam]: async def get_all_teams(self) -> List[NHLTeam]:
"""Retrieves all NHL teams.""" """Retrieves all NHL teams."""
try: try:
# TODO: Implement actual NHL API calls teams_data = self.client.teams.teams()
pass
if not teams_data or "data" not in teams_data:
return []
teams = []
for team_data in teams_data["data"]:
team = self._transform_team_data(team_data)
if team:
teams.append(team)
return teams
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching all teams: {e}")
return [] return []
async def get_teams_by_division(self, division: str) -> List[NHLTeam]: async def get_teams_by_division(self, division: str) -> List[NHLTeam]:
"""Retrieves all teams in a specific division.""" """Retrieves all teams in a specific division."""
try: try:
# TODO: Implement actual NHL API calls all_teams = await self.get_all_teams()
pass return [team for team in all_teams if team.division.lower() == division.lower()]
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching teams by division {division}: {e}")
return [] return []
async def get_teams_by_conference(self, conference: str) -> List[NHLTeam]: async def get_teams_by_conference(self, conference: str) -> List[NHLTeam]:
"""Retrieves all teams in a specific conference.""" """Retrieves all teams in a specific conference."""
try: try:
# TODO: Implement actual NHL API calls all_teams = await self.get_all_teams()
pass return [team for team in all_teams if team.conference.lower() == conference.lower()]
except Exception as e: except Exception as e:
# Log error appropriately logger.error(f"Error fetching teams by conference {conference}: {e}")
return [] return []
def _transform_team_data(self, team_data: Dict[str, Any]) -> Optional[NHLTeam]:
"""Transforms NHL API team data to domain NHLTeam entity."""
try:
team_name = team_data.get("fullName", "") or team_data.get("teamName", {}).get("default", "")
city_name = team_data.get("placeName", {}).get("default", "")
# Handle team name that might include city
if not city_name and " " in team_name:
parts = team_name.rsplit(" ", 1)
if len(parts) == 2:
city_name = parts[0]
team_name = parts[1]
return NHLTeam(
id=str(team_data.get("id", "")),
name=team_name,
abbreviation=team_data.get("triCode", ""),
city=city_name,
division=team_data.get("divisionName", ""),
conference=team_data.get("conferenceName", ""),
venue_name=None, # Not provided in basic team data
is_active=team_data.get("isActive", True),
)
except Exception as e:
logger.error(f"Error transforming team data: {e}")
return None