Chess Data Analysis

Problem Statement

Let’s look at a very simple example of collecting some data and doing something with it. We will:

  • Build a pipeline to download a player’s game data for the past few months from the chess.com API
  • Use the python-chess package to parse the PGN game data
  • Use pandas to do some basic opening win-rate analysis

Setup

This is a standalone script. Python package requirements are specified in requirements.txt

See the source code for this example (always review code before running it on your own machine)

Implementation

To collect the data we need, we will use the chess.com API’s monthly multigame PGN download endpoint, which has the url format:

https://api.chess.com/pub/player/player-name/games/YYYY/MM/pgn

Firstly, we define a helper function to generate these urls for the most recent months:

def generate_urls_by_month(player: str, num_months: int):
    """Define a series of pgn game resource urls for a player, for num_months recent months."""
    today = datetime.date.today()
    for i in range(num_months):
        d = today - relativedelta(months=i)
        yield f"https://api.chess.com/pub/player/{player}/games/{d.year}/{d.month:02}/pgn"

We also need a function to fetch the raw data from each url.

def fetch_text_data(url: str, session: requests.Session):
    """Fetch text data from a url."""
    r = session.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"})
    return r.text

Each PGN dataset consists of data for multiple games. We’ll create a function called read_game_data to extract individual game details as dictionaries.

def _clean_opening_name(eco_url: str):
    """Get a rough opening name from the chess.com ECO url."""
    name = eco_url.removeprefix("https://www.chess.com/openings/")
    return " ".join(name.split("-")[:2])


def read_game_data(pgn_text: str, player: str):
    """Read PGN data and generate game details (each PGN contains details for multiple games)."""
    pgn = io.StringIO(pgn_text)
    while (headers := chess.pgn.read_headers(pgn)) is not None:
        color = 'W' if headers["White"].lower() == player else 'B'
        
        if headers["Result"] == "1/2-1/2":
            score = 0.5
        elif (color == 'W' and headers["Result"] == "1-0") or (color == 'B' and headers["Result"] == "0-1"):
            score = 1
        else:
            score = 0
        
        yield {
            "color": color,
            "score": score,
            "opening": _clean_opening_name(headers["ECOUrl"]) 
        }

Finally, we need some logic to handle the data analysis (which we’re keeping very barebones). Let’s dump the data into a pandas dataframe and print a table showing:

  • average score grouped by chess opening
  • where the player plays the white pieces
  • ordered by total games
def build_df(data: typing.Iterable[dict]) -> pd.DataFrame:
    df = pd.DataFrame(data)
    df = df[df["color"] == 'W']
    df = df.groupby("opening").agg(total_games=("score", "count"), average_score=("score", "mean"))
    df = df.sort_values(by="total_games", ascending=False)
    return df

All that’s left is to piece everything together.

Note that the Pyper framework hasn’t placed any particular restrictions on the way our ‘business logic’ is implemented. We can use Pyper to simply compose together these logical functions into a concurrent pipeline, with minimal code coupling.

In the pipeline, we will:

  1. Set branch=True for generate_urls_by_month, to allow this task to generate multiple outputs
  2. Create 3 workers for fetch_text_data, so that we can wait on requests concurrently
  3. Set branch=True for read_game_data also, as this generates multiple dictionaries
  4. Let the build_df function consume all output generated by this pipeline
def main():
    player = "hikaru"
    num_months = 6  # Keep this number low, or add sleeps for etiquette

    with requests.Session() as session:
        run = (
            task(generate_urls_by_month, branch=True)
            | task(
                fetch_text_data,
                workers=3,
                bind=task.bind(session=session))
            | task(
                read_game_data,
                branch=True,
                bind=task.bind(player=player))
            > build_df
        )
        df = run(player, num_months)
        print(df.head(10))

With no more lines of code than it would have taken to define a series of sequential for-loops, we’ve defined a concurrently executable data flow!

We can now run everything to see the result of our analysis:

opening               total_games  average_score

Nimzowitsch Larsen            244       0.879098
Closed Sicilian               205       0.924390
Caro Kann                     157       0.882166
Bishops Opening               156       0.900641
French Defense                140       0.846429
Sicilian Defense              127       0.877953
Reti Opening                   97       0.819588
Vienna Game                    71       0.929577
English Opening                61       0.868852
Scandinavian Defense           51       0.862745