import os
import datetime
import pandas as pd
from lxml import etree
#import xlwings as xw
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium_stealth import stealth
from selenium.webdriver.common.by import By
from io import StringIO
#from collections.abc import Callable
def read_ssa_statement(xml_path: str) -> dict:
"""
returns a dictionary with 3 key/value:
UserInformation: dictionary of keys Name and DateOfBirth
Earnings: pandas.DataFrame with Year as index, FicaEarnings and MedicareEarnings as columns
the last row might be -1 which indicates data are not available yet and will be removed.
TotalTaxPaid: dictionary of keys:
FicaTaxTotalEmployer, FicaTaxTotalIndividual, MedicareTaxTotalEmployer, MedicareTaxTotalIndividual
"""
tree = etree.parse(xml_path)
root = tree.getroot()
ns = root.nsmap # {'osss': 'http://ssa.gov/osss/schemas/2.0'}
e = root.find(".//osss:UserInformation", namespaces=ns)
user_info = {
"Name": e.findtext("osss:Name", namespaces=ns),
"DateOfBirth": datetime.date.fromisoformat(e.findtext("osss:DateOfBirth", namespaces=ns))
}
rows = []
for e in root.findall(".//osss:Earnings", namespaces=ns):
year = e.get("startYear")
fica = e.findtext("osss:FicaEarnings", namespaces=ns)
medicare = e.findtext("osss:MedicareEarnings", namespaces=ns)
rows.append({
"Year": int(year),
"FicaEarnings": float(fica) if fica else 0.0,
"MedicareEarnings": float(medicare) if medicare else 0.0
})
earnings_hist = (
pd.DataFrame(rows)
.sort_values("Year")
.reset_index(drop=True)
)
earnings_hist.set_index('Year', inplace=True)
if (earnings_hist.iloc[-1, :] < 0).any():
earnings_hist = earnings_hist.iloc[:-1]
total_tax_paid = {
'FicaTaxTotalEmployer': float(root.find(".//osss:FicaTaxTotalEmployer", namespaces=ns).text),
'FicaTaxTotalIndividual': float(root.find(".//osss:FicaTaxTotalIndividual", namespaces=ns).text),
'MedicareTaxTotalEmployer': float(root.find(".//osss:MedicareTaxTotalEmployer", namespaces=ns).text),
'MedicareTaxTotalIndividual': float(root.find(".//osss:MedicareTaxTotalIndividual", namespaces=ns).text)
}
return {
'UserInformation': user_info,
'Earnings': earnings_hist,
'TotalTaxPaid': total_tax_paid
}
def read_ss_earnings_cap(driver) -> pd.Series:
"""
Returns a Series indexed by year with the Social Security
taxable maximum (contribution and benefit base).
"""
url = "https://www.ssa.gov/oact/cola/cbb.html"
print(f"navigate to {url}")
driver.get(url)
print("get Social Security taxable maximum (contribution and benefit base)...")
# Find the main table
tbl_element = driver.find_element(
By.XPATH,
'//table[.//*[contains(text(),"Contribution and benefit bases, 1937-")]]'
)
html = tbl_element.get_attribute("outerHTML")
df_list = pd.read_html(StringIO(html))
# Skip the first table (header/metadata)
tmp_df = pd.concat(df_list[1:])
# Normalize column names
tmp_df.columns = [str(c).strip() for c in tmp_df.columns]
year_col = tmp_df.columns[0]
amt_col = tmp_df.columns[1]
cleaned_rows = []
for _, row in tmp_df.iterrows():
year_str = str(row[year_col]).strip()
amt_str = str(row[amt_col]).strip()
# Clean amount: remove $, commas, etc.
amt_str = amt_str.replace("$", "").replace(",", "")
try:
amount = float(amt_str)
except ValueError:
continue # skip rows that don't contain numeric amounts
# Case 1: year range like "1937-50"
if "-" in year_str:
start_str, end_str = year_str.split("-")
# Parse start year
try:
start = int(start_str)
except ValueError:
continue
# Handle two-digit end years (e.g., "50" → "1950")
if len(end_str) == 2:
century = start // 100 # e.g., 1937 → 19
end = century * 100 + int(end_str)
else:
try:
end = int(end_str)
except ValueError:
continue
for y in range(start, end + 1):
cleaned_rows.append((y, amount))
# Case 2: single year like "1955"
else:
try:
y = int(year_str)
cleaned_rows.append((y, amount))
except ValueError:
continue
# Build DataFrame
df = pd.DataFrame(cleaned_rows, columns=["Year", "TaxableMax"])
df = df.drop_duplicates(subset="Year")
df = df.sort_values("Year")
df = df.set_index("Year")
return df["TaxableMax"]
def read_awi(driver) -> pd.DataFrame:
url = "https://www.ssa.gov/oact/cola/AWI.html" # this page includes all AWIs since 1951
print(f'navigate to {url}')
driver.get(url)
print('get AWI (average wage index)...')
tbl_element = driver.find_element(By.XPATH, '//table[.//*[contains(text(),"National average wage indexing series, 1951-")]]')
df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0].iloc[2:, :-1]
df.columns = ['Year', 'AWI']
df.set_index('Year', inplace=True)
numeric_index = pd.to_numeric(df.index, errors='coerce')
is_NaN = pd.isna(numeric_index)
df = df.loc[~is_NaN].astype(float)
df.index = df.index.astype(int)
return df['AWI'] # pd.Series
# TODO: add exception handling
def read_q3_average_cpiw(driver) -> pd.Series:
url = "https://www.ssa.gov/oact/STATS/avgcpi.html"
print(f'navigate to {url}')
driver.get(url)
print('get average Q3 CPI-W by year...')
tbl_element = driver.find_element(By.XPATH, '//table[.//*[contains(text(),"Average Quarterly, and Average Annual")]]')
df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0]
df.set_index('Year', inplace=True)
s = df['Quarter 3'].dropna()
s.index = s.index.astype(int)
s.name = 'AvgQ3CPI-W'
return s.astype(float)
def read_pia_bend_points(driver) -> pd.DataFrame:
url = "https://www.ssa.gov/oact/cola/bendpoints.html"
print(f'navigate to {url}')
driver.get(url)
print('get PIA formula bend points...')
tbl_element = driver.find_element(By.XPATH, '//table[.//table[@summary="PIA and family maximum bend points"]]')
df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0].iloc[3:-1,:3]
df.columns = ['Year', 'First', 'Second']
df.set_index('Year', inplace=True)
df.index = df.index.astype(int)
return df.replace('[$,]' , '', regex=True).astype(float)
# -----------------------------------------------------------------------------------------------------
def calculate_ssa_retirement_benefit(statement_fpath: str, earnings_extension: str="use_previous"):
ssa_statement = read_ssa_statement(statement_fpath)
# 1. Set up Chrome options to avoid detection
options = Options()
# options.add_argument("--headless=new") # The modern headless argument
options.add_argument("--incognito")
# Optional: set a consistent window size for reliable results
# options.add_argument("--window-size=1920,1080")
options.add_argument("start-maximized") # Maximize window to mimic typical user behavior
options.add_experimental_option("excludeSwitches", ["enable-automation"]) # Exclude the automation switch
options.add_experimental_option('useAutomationExtension', False)
# You may also want to add a custom, realistic user agent string for an extra layer of customization
# options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36')
# 2. Initialize the WebDriver
driver = webdriver.Chrome(options=options)
# 3. Apply selenium_stealth
stealth(driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
awi = read_awi(driver)
q3_avg_cpiw = read_q3_average_cpiw(driver)
pia_bend_points = read_pia_bend_points(driver)
taxable_max = read_ss_earnings_cap(driver)
driver.quit()
#return ssa_statement, awi, cpiw, pia_bend_points
# from Q3 Average CPI-W to COLA (cost of living adjustment) index
cola_idx = q3_avg_cpiw.cummax() # cola_index can not decrease
# COLA_IDX[0] = Q3_AVG_CPI-W[0]
# COLA_IDX[t] = MAX(COLA_IDX[t-1], Q3_AVG_CPI-W[t])
cola_idx.name = 'COLA_INDEX'
#calculate benefit
dob = ssa_statement['UserInformation']['DateOfBirth']
# 1. year of bith
yob = ssa_statement['UserInformation']['DateOfBirth'].year
# 2. year at 60
y60 = yob + 60
awi_y60 = awi.loc[y60]
# 3. year at full retirment age
# https://www.ssa.gov/retirement/full-retirement-age
if dob <= datetime.date(1955, 1, 1):
fra = (66, 0)
elif dob <= datetime.date(1956, 1, 1):
fra = (66, 2)
elif dob <= datetime.date(1957, 1, 1):
fra = (66, 4)
elif dob <= datetime.date(1958, 1, 1):
fra = (66, 6)
elif dob <= datetime.date(1959, 1, 1):
fra = (66, 8)
elif dob <= datetime.date(1960, 1, 1):
fra = (66, 10)
else:
fra = (67, 0)
yfra = yob + fra[0]
if dob.month + fra[1] > 12:
yfra += 1
print(f'Year at full retirement age: {yfra}')
# 4. indexed earnings
earn = ssa_statement['Earnings']['FicaEarnings']
# Determine the last year needed based on COLA index
last_cola_year = q3_avg_cpiw.index.max() # e.g., 2025
last_earn_year = earn.index.max()
if last_cola_year > last_earn_year:
# Need to extend earnings
print(f"Earnings end at {last_earn_year}, but COLA available through {last_cola_year}. Extending...")
earn_extended = earn.copy()
for year in range(last_earn_year + 1, last_cola_year + 1):
if earnings_extension == "use_zero":
earn_extended.loc[year] = 0.0
elif earnings_extension == "use_previous":
earn_extended.loc[year] = earn_extended.loc[year - 1]
elif earnings_extension == "use_ss_earnings_cap":
if year in taxable_max.index:
earn_extended.loc[year] = taxable_max.loc[year]
else:
raise ValueError(f"Taxable maximum not available for year {year}")
else:
raise ValueError("earnings_extension must be 'use_zero', 'use_previous' or 'use_ss_earnings_cap'")
earn = earn_extended
else:
# No extension needed
pass
print(f'last earnings: {earn.iloc[-1]}')
idx_earn = earn.copy()
idx_earn.columns = ['Idx_Eearn']
y0 = idx_earn.index[0]
# inflation adjustment earnings to age 60
idx_earn.loc[y0:y60] = idx_earn.loc[y0:y60] / awi.loc[y0:y60] * awi_y60
# no inflation adjusting after 60! This will be taken care of by COLA
best_idx_earn = idx_earn.sort_values(ascending=False)
len_earn = len(best_idx_earn)
# 5. AIME (average indexed monthly earnings)
print('calculate AIME (average indexed monthly earnings)')
aime = best_idx_earn.iloc[0:min(len_earn, 35)].sum()/420
print(f'AIME: {aime}')
# 6. PIA (primary insurance amount)
#return pia_bend_points
y62 = yob + 62 # year at 62
bpoints = pia_bend_points.loc[y62, :]
b1 = bpoints['First']
b2 = bpoints['Second']
print(f'Bend points at age 62 --- b1: {b1}, b2: {b2}')
print('calculate PIA (primary insurance amount)...')
pia = (
min(aime, b1) * 0.90
+ max(min(aime, b2) - b1, 0) * 0.32
+ max(aime - b2, 0) * 0.15
)
print(f'PIA: {pia}')
# New PIA (COLA adjusted)
print('calculate New PIA (PIA with COLA adjustment from age 60 to now')
y_benefit = earn.index[-1] + 1 # latest available SS taxable earnings + 1yr
print(f'Year of benefit: {y_benefit}')
new_pia = pia * cola_idx.loc[y_benefit - 1] / cola_idx.loc[y60]
print(f'New PIA: {new_pia}')
# 这里我就淘浆糊了,你们自己脑补
# early discount
# delay retirement credit (assuming full retirement age = 66)
new_pia_w_dlay_credit = new_pia * 1.32 # (70 - 66)*8% = 32%)
# if FRA = 67 then * (1+24%)
print(f'new_pia_w_dlay_credit: {new_pia_w_dlay_credit}')
spouse_benefit = new_pia / 2
print(f'spouse_benefit: {spouse_benefit}')
print(f'couple_benefit: {new_pia_w_dlay_credit+spouse_benefit}')
print(f'couple_yearly_benefit: {12*(new_pia_w_dlay_credit+spouse_benefit)}')
# -------------------------------------------------------------------------------------
statement_fpath = "your statement path here"
#earnings_extension = 'use_zero'
#earnings_extension = 'use_previous'
earnings_extension = 'use_ss_earnings_cap'
calculate_ssa_retirement_benefit(
statement_fpath,
earnings_extension=earnings_extension
)
Python Code here...
所有跟帖:
•
跟YouTube 学了三个小时的python 课程,现在有点明白了
-千万里-
♂
(200 bytes)
()
01/30/2026 postreply
01:47:44
•
在家里我下载了Anaconda,公司里用的是Mamba
-slow_quick-
♂
(412 bytes)
()
01/30/2026 postreply
07:28:23
•
有几个module不在Anaconda distribution里要自己下载,很容易的
-slow_quick-
♂
(271 bytes)
()
01/30/2026 postreply
08:12:36