Import Libraries¶
In [11]:
#!/usr/bin/env python3
import pandas as pd
from datetime import datetime
from termcolor import colored
Initialise notebook options¶
In [12]:
source_path = "/home/hass/Development/smmtdata-evolved/ocr/"
#source_path = "ocr/"
import_name = source_path + "OUT_3b_custtype_CLEANSE_newformat-2023.csv"
output_name = source_path + "OUT_3c_custtype_FINISHED_newformat-2023.csv"
error_name = source_path + "OUT_3d_custtype_FINISHED_newformat_errors-2023.log"
# Constants
num_lines = 0 # lines in the import file, determined later
fieldcount = 6 # Number of lines per month including month header
cat_count = 4 # number of lines with data to capture
In [13]:
## Count lines for import
with open(import_name, 'r') as linecount:
num_lines = sum(1 for line in linecount)
print("\n")
print("================================================================================================================================")
print("Opening File : " + import_name)
print("Expecting : " + str(int(num_lines / fieldcount * cat_count * 2)) + " Rows")
print("Output File : " + output_name)
print("================================================================================================================================")
print("\n")
================================================================================================================================ Opening File : /home/hass/Development/smmtdata-evolved/ocr/OUT_3b_custtype_CLEANSE_newformat-2023.csv Expecting : 112 Rows Output File : /home/hass/Development/smmtdata-evolved/ocr/OUT_3c_custtype_FiNISHED_newformat-2023.csv ================================================================================================================================
In [14]:
# Import file into Pandas dataframe
import_dataframe = pd.read_csv(import_name, header=None, dtype={0 : object, 1 : object, 2 : object})
# Create Pandas dataframe for the transposed record
output_table = pd.DataFrame(columns=[
"Usage",
"Source",
"Year",
"Month",
"Topcat",
"Subcat",
"Quantity",
"Acquire_Date",
"Acquire_By",
"Acquire_ Method",
"fileDate"
]
)
In [15]:
# Error handling
debug = 0
error_file = open(error_name, 'w')
def error_rep(ir, sd, fld, error): # input_row, source, subcat, qty
error_file.write(str(ir) + sd + " " + fld + " " + error + "\n")
Data cleansing¶
In [16]:
TOPCATLOOKUP = {
"Diesel" : "Fossil",
"Petrol": "Fossil",
"AFV": "Xhev",
"Total": "Fuel_TOTAL",
"Private": "Customer",
"Fleet": "Customer",
"Business": "Customer",
"Total" : "Fuel Total",
"Mhev_diesel": "xHEV",
"Hev" : "HEV",
"Hev" : "xHEV",
"Total": "Total",
"Mhev_petrol": "xHEV",
"Bev" : "Plugin",
"Phev" : "Plugin"
}
def get_tcat(g_scat):
try:
g_topcat = TOPCATLOOKUP[g_scat]
except KeyError:
g_topcat = "err"
return g_topcat
SCATLOOKUP = {
"Diesel" : "Diesel",
"Petrol" : "Petrol",
"AFV" : "AFV",
"Total": "Total",
"Private": "Private",
"Fleet": "Fleet",
"Business": "Business",
"Mhev_diesel": "MHEV_Diesel",
"Hev" : "HEV",
"Hev" : "xHEV",
"Mhev_petrol": "MHEV_Petrol",
"Bev" : "Plugin",
"Phev" : "Plugin"
}
def get_scat(g_scat):
try:
g_scat = SCATLOOKUP[g_scat]
except KeyError:
g_scat = "err"
return g_scat
Add data to table¶
In [17]:
def addtotbl(sub_cat, top_cat, month, year, source, qty):
# print(sub_cat, month, year, source, qty)
# Add top categories to sub categories
if sub_cat == "Total":
topcat = "Customer"
elif sub_cat == "Private":
topcat = "Customer"
elif sub_cat == "Fleet":
topcat = "Customer"
elif sub_cat == "Private":
topcat = "Customer"
elif sub_cat == "Business":
topcat = "Customer"
else:
topcat="err"
new_row = {"Usage" : "Private",
"Source" : source,
"Year" : year,
"Month" : month,
"Topcat" : topcat,
"Subcat" : sub_cat,
"Quantity" : qty,
"Acquire_Date" : datetime.now().strftime("%d-%m-%y"),
"Acquire_By" : "Tesseract-new",
"Acquire_ Method" : "Manual",
"fileDate" : month + "-" + year}
output_table.loc[len(output_table)] = new_row
Shuffle data into the right fields and save it¶
In [18]:
# Declare variables for data transposition
input_row = 0
input_col = 0
cnt = 0
for i in range(0, num_lines, fieldcount): # start, total lines in file, number of fields per month
input_row = i
out_usage = "Personal"
out_month = import_dataframe.iloc[input_row, input_col].capitalize()
input_row += 1
source_year = import_dataframe.iloc[input_row, input_col+1]
out_source = out_month + "-" + source_year
out_year0 = import_dataframe.iloc[input_row, 1]
out_year1 = import_dataframe.iloc[input_row, 2]
input_row += 1
for x in range(1,fieldcount - 1 ): # # lines to capture, -1 excludes month header
o_subcat = import_dataframe.iloc[input_row, 0].capitalize()
out_qty = import_dataframe.iloc[input_row, 1]
out_subcat = get_scat(o_subcat)
out_topcat = get_tcat(o_subcat)
if out_qty.isnumeric() != True:
out_qty = "err"
error_rep(input_row, out_source, "out_total0", out_subcat)
if out_topcat == "err":
error_rep(input_row, out_source, out_subcat, out_qty)
if out_subcat == "err":
error_rep(input_row, out_source, out_subcat, out_qty)
if debug == 1 : print("Called : " + out_subcat + " " + out_month + " " + out_year0 + " " + "Primary" + " " + str(out_qty))
addtotbl(out_subcat, out_topcat, out_month, out_year0, "Primary", out_qty)
out_qty = import_dataframe.iloc[input_row, 2]
if out_qty.isnumeric() != True:
out_qty = "err"
error_rep(out_source, "out_total1", out_subcat)
addtotbl(out_subcat, out_topcat, out_month, out_year1, "Secondary", out_qty)
#print("Year 1")
input_row +=1
input_row +=1
error_file.close()
output_table.to_csv(output_name, index=True)
#jname = source_path + "3c-json.json"
#output_table.to_json(jname, index=True)
print("\n")
print("\n")
print("The generated file needs to be validated, OCR is not 100% reliable")
print("================================================================================================================================")
print("\n")
print("\n")
print("Opening File : " + import_name)
print("\n")
print("Output File : " + output_name)
print("\n")
print("Expecting : " + str(int(num_lines / fieldcount * cat_count * 2)) + " Rows")
print("Output File : " + str(len(output_table.index)) + " Rows")
print("\n")
print("Error report : " + error_name)
print("\n")
print("================================================================================================================================")
print("\n")
print("Errors detected:")
print("================================================================================================================================")
error_file=open(error_name,'r')
line=error_file.readline()
if line == "":
print("No Errors")
else:
while(line!=""):
error_line = colored(line, "black", "on_white").replace("\n","")
print(error_line)
line=error_file.readline()
error_file.close()
The generated file needs to be validated, OCR is not 100% reliable ================================================================================================================================ Opening File : /home/hass/Development/smmtdata-evolved/ocr/OUT_3b_custtype_CLEANSE_newformat-2023.csv Output File : /home/hass/Development/smmtdata-evolved/ocr/OUT_3c_custtype_FiNISHED_newformat-2023.csv Expecting : 112 Rows Output File : 112 Rows Error report : /home/hass/Development/smmtdata-evolved/ocr/OUT_3d_custtype_FINISHED_newformat_errors-2023.log ================================================================================================================================ Errors detected: ================================================================================================================================ No Errors