Python Code here...

本帖于 2026-01-06 21:48:59 时间, 由普通用户 slow_quick 编辑
回答: 计算社安退休金的Python codeslow_quick2026-01-06 19:29:45


import os
import datetime
import pandas as pd
from lxml import etree
#import xlwings as xw
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium_stealth import stealth
from selenium.webdriver.common.by import By
from io import StringIO
from collections.abc import Callable

def read_ssa_statement(xml_path: str) -> dict:
    """
    returns a dictionary with 3 key/value:
        UserInformation: dictionary of keys Name and DateOfBirth
        Earnings: pandas.DataFrame with Year as index, FicaEarnings and MedicareEarnings as columns
            the last row might be -1 which indicates data are not available yet and will be removed.
        TotalTaxPaid: dictionary of keys:
            FicaTaxTotalEmployer, FicaTaxTotalIndividual, MedicareTaxTotalEmployer, MedicareTaxTotalIndividual
    """
    tree = etree.parse(xml_path)
    root = tree.getroot()
    ns = root.nsmap  # {'osss': 'http://ssa.gov/osss/schemas/2.0'}

    e = root.find(".//osss:UserInformation", namespaces=ns)
    user_info = {
        "Name": e.findtext("osss:Name", namespaces=ns),
        "DateOfBirth": datetime.date.fromisoformat(e.findtext("osss:DateOfBirth", namespaces=ns))
    }
    
    rows = []
    for e in root.findall(".//osss:Earnings", namespaces=ns):
        year = e.get("startYear")

        fica = e.findtext("osss:FicaEarnings", namespaces=ns)
        medicare = e.findtext("osss:MedicareEarnings", namespaces=ns)

        rows.append({
            "Year": int(year),
            "FicaEarnings": float(fica) if fica else 0.0,
            "MedicareEarnings": float(medicare) if medicare else 0.0
        })

    earnings_hist = (
        pd.DataFrame(rows)
        .sort_values("Year")
        .reset_index(drop=True)
    )

    earnings_hist.set_index('Year', inplace=True)

    if (earnings_hist.iloc[-1, :] < 0).any():
        earnings_hist = earnings_hist.iloc[:-1]
    
    total_tax_paid = {
        'FicaTaxTotalEmployer': float(root.find(".//osss:FicaTaxTotalEmployer", namespaces=ns).text),
        'FicaTaxTotalIndividual': float(root.find(".//osss:FicaTaxTotalIndividual", namespaces=ns).text),
        'MedicareTaxTotalEmployer': float(root.find(".//osss:MedicareTaxTotalEmployer", namespaces=ns).text),
        'MedicareTaxTotalIndividual': float(root.find(".//osss:MedicareTaxTotalIndividual", namespaces=ns).text)
    }

    return {
        'UserInformation': user_info,
        'Earnings': earnings_hist,
        'TotalTaxPaid': total_tax_paid   
    }
    
    
def read_awi(driver) -> pd.DataFrame:    
    url = "https://www.ssa.gov/oact/cola/AWI.html"  # this page includes all AWIs since 1951
    print(f'navigate to {url}')
    driver.get(url)

    print('get AWI (average wage index)...')
    tbl_element = driver.find_element(By.XPATH, '//table[.//*[contains(text(),"National average wage indexing series, 1951-")]]')
    df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0].iloc[2:, :-1]

    df.columns = ['Year', 'AWI']
    df.set_index('Year', inplace=True)
    numeric_index = pd.to_numeric(df.index, errors='coerce')
    is_NaN = pd.isna(numeric_index)
    df = df.loc[~is_NaN].astype(float)
    df.index = df.index.astype(int)
    
    return df['AWI'] # pd.Series
    # TODO: add exception handling

def read_q3_average_cpiw(driver) -> pd.Series:
    url = "https://www.ssa.gov/oact/STATS/avgcpi.html"
    print(f'navigate to {url}')
    driver.get(url)

    print('get average Q3 CPI-W by year...')
    tbl_element = driver.find_element(By.XPATH, '//table[.//*[contains(text(),"Average Quarterly, and Average Annual")]]')
    df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0]
    df.set_index('Year', inplace=True)
    s = df['Quarter 3'].dropna()
    s.index = s.index.astype(int)
    s.name = 'AvgQ3CPI-W'
    return s.astype(float)

def read_pia_bend_points(driver) -> pd.DataFrame:
    url = "https://www.ssa.gov/oact/cola/bendpoints.html"
    print(f'navigate to {url}')
    driver.get(url)

    print('get PIA formula bend points...')
    tbl_element = driver.find_element(By.XPATH, '//table[.//table[@summary="PIA and family maximum bend points"]]')
    df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0].iloc[3:-1,:3]

    df.columns = ['Year', 'First', 'Second']
    df.set_index('Year', inplace=True)
    df.index = df.index.astype(int)

    return df.replace('[$,]' , '', regex=True).astype(float)

# -------------------------------------------------------------------

def calculate_ssa_retirement_benefit(statement_fpath: str):
    ssa_statement = read_ssa_statement(statement_fpath)

    # 1. Set up Chrome options to avoid detection
    options = Options()
    # options.add_argument("--headless=new") # The modern headless argument
    options.add_argument("--incognito")
    # Optional: set a consistent window size for reliable results
    # options.add_argument("--window-size=1920,1080")
    options.add_argument("start-maximized") # Maximize window to mimic typical user behavior
    options.add_experimental_option("excludeSwitches", ["enable-automation"]) # Exclude the automation switch
    options.add_experimental_option('useAutomationExtension', False)
    # You may also want to add a custom, realistic user agent string for an extra layer of customization
    # options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36')
    
    # 2. Initialize the WebDriver
    driver = webdriver.Chrome(options=options)
    
    # 3. Apply selenium_stealth
    stealth(driver,
            languages=["en-US", "en"],
            vendor="Google Inc.",
            platform="Win32",
            webgl_vendor="Intel Inc.",
            renderer="Intel Iris OpenGL Engine",
            fix_hairline=True,
            )
    
    awi = read_awi(driver)
    q3_avg_cpiw = read_q3_average_cpiw(driver)
    pia_bend_points = read_pia_bend_points(driver)

    driver.quit()

    #return ssa_statement, awi, cpiw, pia_bend_points

    # from Q3 Average CPI-W to COLA (cost of living adjustment) index
    cola_idx = q3_avg_cpiw.cummax()  # cola_index can not decrease
    # COLA_IDX[0] = Q3_AVG_CPI-W[0]
    # COLA_IDX[t] = MAX(COLA_IDX[t-1], Q3_AVG_CPI-W[t])
    cola_idx.name = 'COLA_INDEX'

    #calculate benefit
    dob = ssa_statement['UserInformation']['DateOfBirth']

    # 1. year of bith
    yob = ssa_statement['UserInformation']['DateOfBirth'].year

    # 2. year at 60
    y60 = yob + 60
    awi_y60 = awi.loc[y60]

    # 3. year at full retirment age
    # https://www.ssa.gov/retirement/full-retirement-age
    if dob <= datetime.date(1955, 1, 1):
        fra = (66, 0)
    elif dob <= datetime.date(1956, 1, 1):
        fra = (66, 2)
    elif dob <= datetime.date(1957, 1, 1):
        fra = (66, 4)
    elif dob <= datetime.date(1958, 1, 1):
        fra = (66, 6)
    elif dob <= datetime.date(1959, 1, 1):
        fra = (66, 8)
    elif dob <= datetime.date(1960, 1, 1):
        fra = (66, 10)
    else:
        fra = (67, 0)

    yfra = yob + fra[0]
    if dob.month + fra[1] > 12:
        yfra += 1

    print(f'Year at full retirement age: {yfra}')
    
    # 4. indexed earnings
    earn = ssa_statement['Earnings']['FicaEarnings']
    idx_earn = earn.copy()
    idx_earn.columns = ['Idx_Eearn']
    y0 = idx_earn.index[0]

    # inflation adjustment earnings to age 60
    idx_earn.loc[y0:y60] = idx_earn.loc[y0:y60] / awi.loc[y0:y60] * awi_y60

    # no inflation adjusting after 60!  This will be taken care of by COLA

    best_idx_earn = idx_earn.sort_values(ascending=False)
    len_earn = len(best_idx_earn)

    # 5. AIME (average indexed monthly earnings)
    print('calculate AIME (average indexed monthly earnings)')
    aime = best_idx_earn.iloc[0:min(len_earn, 35)].sum()/420
    print(f'aime: {aime}')

    # 6. PIA (primary insurance amount)
    #return pia_bend_points
    y62 = yob + 62  # year at 62
    bpoints = pia_bend_points.loc[y62, :]
    b1 = bpoints['First']
    b2 = bpoints['Second']
    print(f'bend points at age 62 --- b1: {b1}, b2: {b2}')

    print('calculate PIA (primary insurance amount)...')
    pia = (
        min(aime, b1) * 0.90
        + max(min(aime, b2) - b1, 0) * 0.32
        + max(aime - b2, 0) * 0.15
    )
    print(f'PIA: {pia}')

    # New PIA (COLA adjusted)
    print('calculate New PIA (PIA with COLA adjustment from age 60 to now')
    y_benefit = earn.index[-1] + 1  # latest available SS taxable earnings + 1yr
    new_pia = pia * cola_idx.loc[y_benefit - 1] / cola_idx.loc[y60]
    print(f'New PIA: {new_pia}')

    # 这里我就淘浆糊了,你们自己脑补
    # early discount
    
    # delay retirement credit (assuming full retirement age = 66)
    new_pia_w_dlay_credit = new_pia * 1.32 # (70 - 66)*8% = 32%)
    # if FRA = 67 then * (1+24%)
    print(f'new_pia_w_dlay_credit: {new_pia_w_dlay_credit}')   
    
# ------------------------------------------------------------------------

statement_fpath = r'your ss statement xml file path here'

calculate_ssa_retirement_benefit(statement_fpath)

# ------------------------------------------------------------------------
"""
The code only works for my situation, 70+ who begin withdraw at 70

For younger people, you would need to
* extend the earnings to year you will be 70 year old, using the last value
* extend the COLA index to year you will be 69, using the last value
* set the y_benefit to year you will be 70
"""



请您先登陆,再发跟帖!