import os
import datetime
import pandas as pd
from lxml import etree
#import xlwings as xw
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium_stealth import stealth
from selenium.webdriver.common.by import By
from io import StringIO
from collections.abc import Callable
def read_ssa_statement(xml_path: str) -> dict:
"""
returns a dictionary with 3 key/value:
UserInformation: dictionary of keys Name and DateOfBirth
Earnings: pandas.DataFrame with Year as index, FicaEarnings and MedicareEarnings as columns
the last row might be -1 which indicates data are not available yet and will be removed.
TotalTaxPaid: dictionary of keys:
FicaTaxTotalEmployer, FicaTaxTotalIndividual, MedicareTaxTotalEmployer, MedicareTaxTotalIndividual
"""
tree = etree.parse(xml_path)
root = tree.getroot()
ns = root.nsmap # {'osss': 'http://ssa.gov/osss/schemas/2.0'}
e = root.find(".//osss:UserInformation", namespaces=ns)
user_info = {
"Name": e.findtext("osss:Name", namespaces=ns),
"DateOfBirth": datetime.date.fromisoformat(e.findtext("osss:DateOfBirth", namespaces=ns))
}
rows = []
for e in root.findall(".//osss:Earnings", namespaces=ns):
year = e.get("startYear")
fica = e.findtext("osss:FicaEarnings", namespaces=ns)
medicare = e.findtext("osss:MedicareEarnings", namespaces=ns)
rows.append({
"Year": int(year),
"FicaEarnings": float(fica) if fica else 0.0,
"MedicareEarnings": float(medicare) if medicare else 0.0
})
earnings_hist = (
pd.DataFrame(rows)
.sort_values("Year")
.reset_index(drop=True)
)
earnings_hist.set_index('Year', inplace=True)
if (earnings_hist.iloc[-1, :] < 0).any():
earnings_hist = earnings_hist.iloc[:-1]
total_tax_paid = {
'FicaTaxTotalEmployer': float(root.find(".//osss:FicaTaxTotalEmployer", namespaces=ns).text),
'FicaTaxTotalIndividual': float(root.find(".//osss:FicaTaxTotalIndividual", namespaces=ns).text),
'MedicareTaxTotalEmployer': float(root.find(".//osss:MedicareTaxTotalEmployer", namespaces=ns).text),
'MedicareTaxTotalIndividual': float(root.find(".//osss:MedicareTaxTotalIndividual", namespaces=ns).text)
}
return {
'UserInformation': user_info,
'Earnings': earnings_hist,
'TotalTaxPaid': total_tax_paid
}
def read_awi(driver) -> pd.DataFrame:
url = "https://www.ssa.gov/oact/cola/AWI.html" # this page includes all AWIs since 1951
print(f'navigate to {url}')
driver.get(url)
print('get AWI (average wage index)...')
tbl_element = driver.find_element(By.XPATH, '//table[.//*[contains(text(),"National average wage indexing series, 1951-")]]')
df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0].iloc[2:, :-1]
df.columns = ['Year', 'AWI']
df.set_index('Year', inplace=True)
numeric_index = pd.to_numeric(df.index, errors='coerce')
is_NaN = pd.isna(numeric_index)
df = df.loc[~is_NaN].astype(float)
df.index = df.index.astype(int)
return df['AWI'] # pd.Series
# TODO: add exception handling
def read_q3_average_cpiw(driver) -> pd.Series:
url = "https://www.ssa.gov/oact/STATS/avgcpi.html"
print(f'navigate to {url}')
driver.get(url)
print('get average Q3 CPI-W by year...')
tbl_element = driver.find_element(By.XPATH, '//table[.//*[contains(text(),"Average Quarterly, and Average Annual")]]')
df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0]
df.set_index('Year', inplace=True)
s = df['Quarter 3'].dropna()
s.index = s.index.astype(int)
s.name = 'AvgQ3CPI-W'
return s.astype(float)
def read_pia_bend_points(driver) -> pd.DataFrame:
url = "https://www.ssa.gov/oact/cola/bendpoints.html"
print(f'navigate to {url}')
driver.get(url)
print('get PIA formula bend points...')
tbl_element = driver.find_element(By.XPATH, '//table[.//table[@summary="PIA and family maximum bend points"]]')
df = pd.read_html(StringIO(tbl_element.get_attribute('outerHTML')))[0].iloc[3:-1,:3]
df.columns = ['Year', 'First', 'Second']
df.set_index('Year', inplace=True)
df.index = df.index.astype(int)
return df.replace('[$,]' , '', regex=True).astype(float)
# -------------------------------------------------------------------
def calculate_ssa_retirement_benefit(statement_fpath: str):
ssa_statement = read_ssa_statement(statement_fpath)
# 1. Set up Chrome options to avoid detection
options = Options()
# options.add_argument("--headless=new") # The modern headless argument
options.add_argument("--incognito")
# Optional: set a consistent window size for reliable results
# options.add_argument("--window-size=1920,1080")
options.add_argument("start-maximized") # Maximize window to mimic typical user behavior
options.add_experimental_option("excludeSwitches", ["enable-automation"]) # Exclude the automation switch
options.add_experimental_option('useAutomationExtension', False)
# You may also want to add a custom, realistic user agent string for an extra layer of customization
# options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36')
# 2. Initialize the WebDriver
driver = webdriver.Chrome(options=options)
# 3. Apply selenium_stealth
stealth(driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
awi = read_awi(driver)
q3_avg_cpiw = read_q3_average_cpiw(driver)
pia_bend_points = read_pia_bend_points(driver)
driver.quit()
#return ssa_statement, awi, cpiw, pia_bend_points
# from Q3 Average CPI-W to COLA (cost of living adjustment) index
cola_idx = q3_avg_cpiw.cummax() # cola_index can not decrease
# COLA_IDX[0] = Q3_AVG_CPI-W[0]
# COLA_IDX[t] = MAX(COLA_IDX[t-1], Q3_AVG_CPI-W[t])
cola_idx.name = 'COLA_INDEX'
#calculate benefit
dob = ssa_statement['UserInformation']['DateOfBirth']
# 1. year of bith
yob = ssa_statement['UserInformation']['DateOfBirth'].year
# 2. year at 60
y60 = yob + 60
awi_y60 = awi.loc[y60]
# 3. year at full retirment age
# https://www.ssa.gov/retirement/full-retirement-age
if dob <= datetime.date(1955, 1, 1):
fra = (66, 0)
elif dob <= datetime.date(1956, 1, 1):
fra = (66, 2)
elif dob <= datetime.date(1957, 1, 1):
fra = (66, 4)
elif dob <= datetime.date(1958, 1, 1):
fra = (66, 6)
elif dob <= datetime.date(1959, 1, 1):
fra = (66, 8)
elif dob <= datetime.date(1960, 1, 1):
fra = (66, 10)
else:
fra = (67, 0)
yfra = yob + fra[0]
if dob.month + fra[1] > 12:
yfra += 1
print(f'Year at full retirement age: {yfra}')
# 4. indexed earnings
earn = ssa_statement['Earnings']['FicaEarnings']
idx_earn = earn.copy()
idx_earn.columns = ['Idx_Eearn']
y0 = idx_earn.index[0]
# inflation adjustment earnings to age 60
idx_earn.loc[y0:y60] = idx_earn.loc[y0:y60] / awi.loc[y0:y60] * awi_y60
# no inflation adjusting after 60! This will be taken care of by COLA
best_idx_earn = idx_earn.sort_values(ascending=False)
len_earn = len(best_idx_earn)
# 5. AIME (average indexed monthly earnings)
print('calculate AIME (average indexed monthly earnings)')
aime = best_idx_earn.iloc[0:min(len_earn, 35)].sum()/420
print(f'aime: {aime}')
# 6. PIA (primary insurance amount)
#return pia_bend_points
y62 = yob + 62 # year at 62
bpoints = pia_bend_points.loc[y62, :]
b1 = bpoints['First']
b2 = bpoints['Second']
print(f'bend points at age 62 --- b1: {b1}, b2: {b2}')
print('calculate PIA (primary insurance amount)...')
pia = (
min(aime, b1) * 0.90
+ max(min(aime, b2) - b1, 0) * 0.32
+ max(aime - b2, 0) * 0.15
)
print(f'PIA: {pia}')
# New PIA (COLA adjusted)
print('calculate New PIA (PIA with COLA adjustment from age 60 to now')
y_benefit = earn.index[-1] + 1 # latest available SS taxable earnings + 1yr
new_pia = pia * cola_idx.loc[y_benefit - 1] / cola_idx.loc[y60]
print(f'New PIA: {new_pia}')
# 这里我就淘浆糊了,你们自己脑补
# early discount
# delay retirement credit (assuming full retirement age = 66)
new_pia_w_dlay_credit = new_pia * 1.32 # (70 - 66)*8% = 32%)
# if FRA = 67 then * (1+24%)
print(f'new_pia_w_dlay_credit: {new_pia_w_dlay_credit}')
# ------------------------------------------------------------------------
statement_fpath = r'your ss statement xml file path here'
calculate_ssa_retirement_benefit(statement_fpath)
# ------------------------------------------------------------------------
"""
The code only works for my situation, 70+ who begin withdraw at 70
For younger people, you would need to
* extend the earnings to year you will be 70 year old, using the last value
* extend the COLA index to year you will be 69, using the last value
* set the y_benefit to year you will be 70
"""