| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 |
- """
- BudgetX
- 這是一隻用來比對以及檢查預算書的工具程式。
- 它主要是設計給需要確認預算書的主管,來快速確認同仁們寫的預算書沒有明顯的問題。
- 它主要使用的函式庫為:
- Gradio 提供 Web UI
- Camelot 提供 PDF 表格萃取
- Pandas 提供 分析表格
- 版權所有 © 2025 亞新工程顧問股份有限公司
- Copyright © 2025 Moh And Associates, All Rights Reserved.
- """
- import gradio as gr
- import camelot
- import pandas as pd
- import glob
- import math
- # Global variables to store PDF data, generated by Qwen code?
- # May make sense to reinstate if performance becomes a problem in the future.
- #pdf1_data = None
- #pdf2_data = None
- #dropdown_options = []
- #table_data = []
- # Constant Strings
- MSG_PREFIX = "<span style='color: red; font-style: italic'>"
- MSG_POSTFIX = "</span>"
- GRAND_TOTAL_NAME = "總價"
- # DataFrame Column Names
- COMP_COLS = ["參考預算書獨有項目 (-)", "共有項目 (∩)", "被檢驗預算書獨有項目 (+)"]
- PRICE_COLS = ["項目", "單位", "數量", "單價", "復價", "檢查", "被檢驗預算書價格 %", "參考預算書價格 %"]
- CODE_VERIFY_COLS = ["項目", "單位", "編碼", "章碼章名(1~5碼)檢查", "6碼請您比對", "7碼請您比對", "8碼請您比對", "9碼請您比對", "10碼比對"]
- # Hard-coded row and column positions for the demo.
- # May have to turn these into dynamically-determined values later on if the format is flexible.
- ITEMS_COL = 1
- UNIT_COL = 2
- AMOUNT_COL = 3
- UNITPRICE_COL = 4
- TOTAL_COL = 5
- CODE_COL = 6
- HEADER_ROW = 2
- # Work Codes
- CODE_DIR = "workcodes\\raw\\"
- CHAPTER = "章碼"
- CHAPTER_NAME = "章名"
- CODE_6 = "6碼"
- CODE_6_NAME = "6碼名稱"
- CODE_7 = "7碼"
- CODE_7_NAME = "7碼名稱"
- CODE_8 = "8碼"
- CODE_8_NAME = "8碼名稱"
- CODE_9 = "9碼"
- CODE_9_NAME = "9碼名稱"
- CODE_10 = "10碼"
- CODE_10_NAME = "10碼名稱"
- def process_pdf(file_path):
- """
- Processes a PDF file, given the path.
- This method reads in all of the tables found in the given PDF file with camelot, and
- builds a pandas DataFrame with its contents, skipping the first few header rows.
- The header row indexes are hard-coded for now as HEADER_ROW, but we may choose to
- determine this dynamically in the future, if deemed necessary.
- Args:
- file_path (string): The path to the PDF file, as provided by the gradio.File input box.
- Returns:
- A pandas DataFrame containing the concatenated table contents.
- """
- # Extract tables from PDF using Camelot
- try:
- tables = camelot.read_pdf(file_path, pages='all')
- # print(f"{tables.n} Tables read from {file_path}.")
- rtn = None
- for i, table in enumerate(tables):
- df = table.df
- if (len(df) > HEADER_ROW):
- if rtn is None:
- # print(f"Creating DataFrame with Column Names:\n{df.iloc[HEADER_ROW].tolist()}\n")
- rtn = pd.DataFrame(columns=df.iloc[HEADER_ROW].tolist())
- # print(f"Appending the following rows:\n{df.iloc[HEADER_ROW+1:]}\n")
- df.columns = rtn.columns
- rtn = pd.concat([rtn, df.iloc[HEADER_ROW+1:]])
- else:
- print(f"Table {i} has less than or equal to {HEADER_ROW} rows! Skipping Table:\n{df}")
- # print(f"Built DataFrame from PDF tables:\n{rtn}")
- return rtn
- except Exception as e:
- return f"Error: {str(e)}"
- def comp_items(pdf1_data, pdf2_data):
- """
- Compares the item columns of two PDFs, building the set differences between them.
- Args:
- pdf1_data (DataFrame): The combined tables from PDF1, as generated by process_pdf
- pdf2_data (DataFrame): The combined tables from PDF2, as generated by process_pdf
- Returns:
- A pandas DataFrame containing the set left difference, intersection and set right difference
- """
- set1 = set(pdf1_data.iloc[:, ITEMS_COL])
- set2 = set(pdf2_data.iloc[:, ITEMS_COL])
- # print(f"Set 1:\n{set1}")
- # print(f"Set 2:\n{set2}")
- rtn = pd.concat([
- pd.DataFrame({COMP_COLS[0]: list(set1 - set2)}),
- pd.DataFrame({COMP_COLS[1]: list(set1.intersection(set2))}),
- pd.DataFrame({COMP_COLS[2]: list(set2 - set1)})
- ])
- rtn = rtn.fillna('')
- # print(f"Comparison DF:\n{rtn}")
- return rtn
- def verify_prices(pdf1_data, pdf1_total_price, pdf2_data, pdf2_total_price):
- """
- Runs a variety of tests against the prices in the second budget PDF.
- Args:
- pdf2_data (DataFrame): The combined tables from PDF2, as generated by process_pdf
- Returns:
- A pandas DataFrame containing the price verifications
- """
- rtn = pd.DataFrame(columns=PRICE_COLS)
- grand_total_name = ""
- grand_total = 0
- running_total = 0
- for i in range(len(pdf2_data)):
- try:
- item = pdf2_data.iloc[i, ITEMS_COL]
- unit = pdf2_data.iloc[i, UNIT_COL]
- amount_str = pdf2_data.iloc[i, AMOUNT_COL]
- unit_price_str = pdf2_data.iloc[i, UNITPRICE_COL]
- total_str = pdf2_data.iloc[i, TOTAL_COL]
- if GRAND_TOTAL_NAME in item and amount_str == "" and unit_price_str == "" and total_str != "":
- if grand_total_name != "":
- # Multiple Grand Totals...?
- row = pd.DataFrame({
- PRICE_COLS[0]: item,
- PRICE_COLS[1]: "",
- PRICE_COLS[2]: "",
- PRICE_COLS[3]: "",
- PRICE_COLS[4]: "",
- PRICE_COLS[5]: f"有多個 總價 欄位!第一個找到的: {grand_total_name}",
- PRICE_COLS[6]: "",
- PRICE_COLS[7]: "",
- })
- rtn = pd.concat([rtn, row], ignore_index=True)
- else:
- grand_total_name = item
- grand_total = float(total_str.replace(",", ""))
- else:
- amount = float(amount_str.replace(",", ""))
- unit_price = float(unit_price_str.replace(",", ""))
- total = float(total_str.replace(",", ""))
- running_total += total
- check = "數量乘單價與復價的差異大於 1 !" if abs(amount*unit_price - total) > 1.0 else "OK"
- # pdf2 的復價百分比
- pdf2_percent = str(round(total / pdf2_total_price * 100, 2)) + "%"
- # pdf1 的復價百分比
- pdf1_match = pdf1_data[pdf1_data.iloc[:, ITEMS_COL] == item]
- if not pdf1_match.empty:
- try:
- pdf1_total = float(pdf1_match.iloc[0, TOTAL_COL].replace(",", ""))
- pdf1_percent = str(round(pdf1_total / pdf1_total_price * 100, 2)) + "%"
- except Exception:
- pdf1_percent = ""
- else:
- pdf1_percent = ""
- row = pd.DataFrame({
- PRICE_COLS[0]: [item],
- PRICE_COLS[1]: [unit],
- PRICE_COLS[2]: [round(amount, 2)],
- PRICE_COLS[3]: [round(unit_price, 2)],
- PRICE_COLS[4]: [round(total, 2)],
- PRICE_COLS[5]: [check],
- PRICE_COLS[6]: [pdf2_percent],
- PRICE_COLS[7]: [pdf1_percent],
- })
- rtn = pd.concat([rtn, row], ignore_index=True)
- except ValueError as v:
- # print(f"String to float Error: {v}. Skipping non-value row:\n{pdf2_data.iloc[i]}")
- continue
- # Append the Grand Total Row
- if grand_total_name == "":
- row = pd.DataFrame({
- PRICE_COLS[0]: "",
- PRICE_COLS[1]: "",
- PRICE_COLS[2]: "",
- PRICE_COLS[3]: "",
- PRICE_COLS[4]: "",
- PRICE_COLS[5]: "預算書中沒有 總價!",
- PRICE_COLS[6]: "",
- PRICE_COLS[7]: "",
- })
- else:
- check = f"總價與所有復價加總大於 1 !復價加總:{running_total}" if abs(grand_total - running_total) > 1.0 else "OK"
- row = pd.DataFrame({
- PRICE_COLS[0]: grand_total_name,
- PRICE_COLS[1]: "",
- PRICE_COLS[2]: "",
- PRICE_COLS[3]: "",
- PRICE_COLS[4]: [grand_total],
- PRICE_COLS[5]: check,
- PRICE_COLS[6]: "",
- PRICE_COLS[7]: "",
- })
- rtn = pd.concat([rtn, row], ignore_index=True)
- # print(f"Verify Prices Result:\n{rtn}")
- return rtn
- def get_total_price(pdf_data):
- grand_total = 0
- for i in range(len(pdf_data)):
- try:
- item = pdf_data.iloc[i, ITEMS_COL]
- amount_str = pdf_data.iloc[i, AMOUNT_COL]
- unit_price_str = pdf_data.iloc[i, UNITPRICE_COL]
- total_str = pdf_data.iloc[i, TOTAL_COL]
- if GRAND_TOTAL_NAME in item and amount_str == "" and unit_price_str == "" and total_str != "":
- grand_total = float(total_str.replace(",", ""))
- break
- except ValueError as v:
- print(f"String to float Error: {v}. Skipping non-value row:\n{pdf_data.iloc[i]}")
- continue
- print(f"Total Prices :\n{grand_total}")
- return grand_total
- def read_code_file(chapter):
- """
- Attempts to find a read in the work code file of the given chapter.
- Currently only handles Excel files with pandas' read_excel (xlrd).
- TODO: Maybe make the cache global and move it in here?
- Args:
- chapter (str) The 5-digit work code chapter to look for.
- Returns:
- The DataFrame representing the Work Code file.
- """
- code_file = CODE_DIR + chapter + "*"
- file_match = glob.glob(code_file)
- # print(f"read_code_file: Found matches for {file_name}: {file_match}")
- if len(file_match) == 1:
- file_lower = file_match[0].lower()
- if file_lower.endswith(".xls") or file_lower.endswith(".xlsx"):
- return pd.read_excel(file_match[0], header=1, dtype=str)
- else:
- print(f"Single file match found, but it is not an excel file?! Match: {file_match[0]}")
- return None
- elif len(file_match) == 0:
- return None
- else:
- print(f"Multiple matches found for: {code_file}! Matches: {file_match}")
- return None
-
- return None
- def code_69_checks(code_value, code_df, code_col, code_name_col):
- """
- Verifies one of Code 6~9. Realized that this only works for a particular
- work code file format, and we will need to create more of these to handle
- more formats.
- This version only works for simple work code file formats, where each value
- from 6 to 9 are separate and non-conditional.
- Example codes that this method works on include:
- 02231, 02336, 02751
- Args:
- code_value (str) The single-digit code entered at this position.
- code_df (DataFrame) The DataFrame of the code table, as read from Excel (or cache).
- code_col (str) The (constant) string of the code column name.
- code_name_col (str) The (constant) string of the code name column name.
- Returns:
- A string to display in the result table.
- """
- try:
- matches = code_df.loc[code_df[code_col] == code_value]
- # print(f"code_69_checks(): Matches for {code_value}:\n{matches}")
- if len(matches) == 0:
- rtn = f"{code_col} {code_value} 並不合規!"
- elif len(matches) > 1:
- rtn = f"編碼檔案中對 {code_col} : {code_value} 有重複定義!"
- else:
- match_val = matches[code_name_col][matches.index[0]]
- # print(f"match_val {match_val} for code {code_value}")
- if code_value == "0" and (
- (isinstance(match_val, float) and math.isnan(match_val)) or
- (isinstance(match_val, str) and match_val.strip() == "")):
- match_val = "不分類"
- rtn = f"{code_df[code_name_col][0]} = {match_val}"
- except KeyError as k:
- rtn = f"編碼檔案中找不到 {code_col} 或是 {code_name_col} 欄位!"
- return rtn
- def verify_codes(pdf2_data):
- """
- Verifies the work codes against government-published work code files.
- Args:
- pdf2_data (DataFrame): The combined tables from PDF2, as generated by process_pdf
- Returns:
- A pandas DataFrame containing the code verifications
- """
- # code_files = os.listdir(CODE_DIR)
- # print(f"verify_codes(): Codes Dir Listing: \n{code_files}")
- rtn = pd.DataFrame(columns=CODE_VERIFY_COLS)
- code_df_cache = {}
- for i in range(len(pdf2_data)):
- item = pdf2_data.iloc[i, ITEMS_COL]
- unit = pdf2_data.iloc[i, UNIT_COL]
- code = pdf2_data.iloc[i, CODE_COL]
- if len(code) >= 10:
- # The files are coded with the first 5 numbers in the code.
- chapter = code[0:5]
- # print(f"verify_codes: Looking for file: {code_file}")
- code_df = None
- check = "OK"
- check_6 = ""
- check_7 = ""
- check_8 = ""
- check_9 = ""
- check_10 = ""
- if chapter in code_df_cache:
- code_df = code_df_cache[chapter]
- else:
- code_df = read_code_file(chapter)
- if isinstance(code_df, pd.DataFrame):
- code_df_cache[chapter] = code_df
- # print(f"Read code DataFrame:\n{code_df}")
- else:
- check = f"找不到這個編碼 (前5碼開頭) 的編碼檔案!請再確認編碼正確,或跟軟體工程師回報編碼檔需要更新!"
- if code_df is not None:
- if chapter != code_df[CHAPTER][0]:
- check = f"檔案名稱與章碼不符合?!請跟開發人員聯繫,更新 {chapter} 章的檔案!"
- else:
- chapter_name = code_df[CHAPTER_NAME][0]
- if not chapter_name in item:
- check = f"項目名稱中沒有章節名稱:{chapter_name}"
-
- # Code 6 ~ 9 checks:
- check_6 = code_69_checks(code[5:6], code_df, CODE_6, CODE_6_NAME)
- check_7 = code_69_checks(code[6:7], code_df, CODE_7, CODE_7_NAME)
- check_8 = code_69_checks(code[7:8], code_df, CODE_8, CODE_8_NAME)
- check_9 = code_69_checks(code[8:9], code_df, CODE_9, CODE_9_NAME)
- try:
- code_10 = code[9:10]
- matches = code_df.loc[code_df[CODE_10] == code_10]
- if len(matches) == 0:
- check_10 = f"{CODE_10} {code_10} 並不合規!"
- elif len(matches) > 1:
- check_10 = f"編碼檔案中對 {CODE_10} : {code_10} 有重複定義!請跟軟體工程師回報編碼檔錯誤!"
- else:
- match_val = matches[CODE_10_NAME][matches.index[0]]
- print(f"match_val {match_val} for code {code_10}")
- if (unit == match_val):
- check_10 = "OK"
- else:
- check_10 = f"單位不符!編碼單位應該是:{match_val}"
- except KeyError as k:
- check_10 = f"編碼檔案中找不到 {CODE_10} 或是 {CODE_10_NAME} 欄位!請跟軟體工程師回報研究檔案格式問題!"
- if check == "":
- check = "OK"
- row = pd.DataFrame({
- CODE_VERIFY_COLS[0]: [item],
- CODE_VERIFY_COLS[1]: [unit],
- CODE_VERIFY_COLS[2]: [code],
- CODE_VERIFY_COLS[3]: [check],
- CODE_VERIFY_COLS[4]: [check_6],
- CODE_VERIFY_COLS[5]: [check_7],
- CODE_VERIFY_COLS[6]: [check_8],
- CODE_VERIFY_COLS[7]: [check_9],
- CODE_VERIFY_COLS[8]: [check_10],
- })
- rtn = pd.concat([rtn, row], ignore_index=True)
- else:
- print(f"verify_codes: Skipping row: {item}")
- return rtn
- def update_table(pdf1, pdf2):
- """
- Process both PDFs and runs the verification routines.
- Args:
- pdf1 (gradio.File): Gradio File Input box for PDF1.
- pdf2 (gradio.File): Gradio File Input box for PDF2.
- Returns:
- 3 DataFrames for:
- 1. Item Comparison
- 2. Price Verification
- 3. Code Verification
- 1 Status message string
- """
- # This can be useful if we need to speed up processing by caching the data one day...
- # global pdf1_data, pdf2_data
- pdf1_data = None
- pdf1_total_price = None
- pdf2_data = None
- pdf2_total_price = None
- # Process first PDF
- if pdf1:
- result = process_pdf(pdf1.name)
- if isinstance(result, pd.DataFrame):
- pdf1_data = result
- pdf1_total_price = get_total_price(pdf1_data)
- else:
- return None, None, None, MSG_PREFIX+f"參考預算書 PDF 處理錯誤: {result}"+MSG_POSTFIX
- # Process second PDF
- if pdf2:
- result = process_pdf(pdf2.name)
- if isinstance(result, pd.DataFrame):
- pdf2_data = result
- pdf2_total_price = get_total_price(pdf2_data)
- else:
- return None, None, None, MSG_PREFIX+f"被檢驗預算書 PDF 處理錯誤: {result}"+MSG_POSTFIX
-
- # If only 1 is populated, return update message and that's it.
- if pdf1_data is None or pdf2_data is None:
- return None, None, None, MSG_PREFIX+f"PDF 檔案讀取 OK"+MSG_POSTFIX
- else:
- # 1. Compare items in the items column
- comp = comp_items(pdf1_data, pdf2_data)
- if not isinstance(comp, pd.DataFrame):
- return None, None, None, MSG_PREFIX+f"預算書 比對 失敗!錯誤訊息:{comp}"+MSG_POSTFIX
- # 2. Verify prices
- price = verify_prices(pdf1_data, pdf1_total_price, pdf2_data, pdf2_total_price)
- if not isinstance(price, pd.DataFrame):
- return comp, None, None, MSG_PREFIX + f"預算書 價格確認 失敗!錯誤訊息:{price}" + MSG_POSTFIX
- # 3. Verify codes
- code = verify_codes(pdf2_data)
- if not isinstance(code, pd.DataFrame):
- return comp, price, None, MSG_PREFIX + f"預算書 編碼確認 失敗!錯誤訊息:{code}" + MSG_POSTFIX
- return comp, price, code, MSG_PREFIX+f"預算書比對成功"+MSG_POSTFIX
- with gr.Blocks(title="亞新 BudgetX") as app:
- """
- This is the UI layout, Gradio style. UI elements can be added here.
- """
- # gr.HTML("<img src='/file=MAALogoBlueCropped.png'>")
- gr.Markdown("# MAA BudgetX 亞新預算書檢驗系統 (2025-10 Proof of Concept 版)")
- gr.Markdown("---")
- gr.Markdown("### _請上傳參考預算書與被檢驗預算書_")
- with gr.Row():
- # Left column for first PDF
- with gr.Column():
- pdf1_input = gr.File(label="參考預算書 PDF", file_types=[".pdf"])
- # pdf1_preview = gr.File(label="First PDF Preview")
- # Right column for second PDF
- with gr.Column():
- pdf2_input = gr.File(label="被檢驗預算書 PDF", file_types=[".pdf"])
- # pdf2_preview = gr.File(label="Second PDF Preview")
- # Error/warning textbox
- # error_output = gr.Textbox(label="_系統訊息_", interactive=False)
- error_output = gr.HTML(MSG_PREFIX+"系統訊息"+MSG_POSTFIX)
- # 1. 比對兩個預算書中的項目
- gr.Markdown("---")
- gr.Markdown("### _1. 項目比對_")
- gr.Markdown("* _比對參考預算書與被檢驗預算書中的項目欄中的獨有項目。_")
- gr.Markdown("* _無視重複項目。_")
- gr.Markdown("* _些微差異 (空格、錯題字等) 也會當成不同的項目。_")
-
- # Dropdown for selecting tables
- # dropdown = gr.Dropdown(label="選擇比對欄位", choices=["<請先上傳兩本預算書PDF>"])
-
- # Comparison Table
- comp_table = gr.Dataframe(label="預算書「項目及說明」列比對", headers=COMP_COLS)
- # 2. 價格確認
- gr.Markdown("---")
- gr.Markdown("### _2. 價格檢查_")
- gr.Markdown("* _拿 數量 乘上 單價,確認結果跟 復價 相差 1.0 以內_")
- gr.Markdown("* _總價是搜尋有 總價 的欄位,並將它的 復價 欄位跟其他的 復價 的加總比較,要求 相差 1.0 以內_")
- # Price validation table
- price_table = gr.Dataframe(label="價格確認", headers=PRICE_COLS)
- # 3. 編碼確認
- gr.Markdown("---")
- gr.Markdown("### _3. 編碼檢查_")
- gr.Markdown("* _讀取事前下載的編碼列表,但仍有不少問題..._")
- # Code validation table
- code_table = gr.Dataframe(label="編碼比對", headers=CODE_VERIFY_COLS)
- gr.Markdown("---")
- # Event handling
- pdf1_input.change(
- fn=lambda x: x,
- inputs=pdf1_input,
- # outputs=pdf1_preview
- )
-
- pdf2_input.change(
- fn=lambda x: x,
- inputs=pdf2_input,
- # outputs=pdf2_preview
- )
-
- # Update table when both files are uploaded
- inputs = [pdf1_input, pdf2_input]
- outputs = [comp_table, price_table, code_table, error_output]
- pdf1_input.change(update_table, inputs=inputs, outputs=outputs)
- pdf2_input.change(update_table, inputs=inputs, outputs=outputs)
-
- # Update table when dropdown selection changes
- # dropdown.change(update_table, inputs=dropdown, outputs=comp_table)
- if __name__ == "__main__":
- # Setting server_name to 0.0.0.0 allows us to listen to all interfaces.
- app.launch(server_name="0.0.0.0")
|