摘要:功能描述作业需求额度或自定义实现购物商城,买东西加入购物车,调用信用卡接口结账可以提现,手续费支持多账户登录支持账户间转账记录每月日常消费流水提供还款接口记录操作日志提供管理接口,包括添加账户用户额度,冻结账户等。。。
功能描述
作业需求:
1、额度 15000或自定义 2、实现购物商城,买东西加入购物车,调用信用卡接口结账 3、可以提现,手续费5% 4、支持多账户登录 5、支持账户间转账 6、记录每月日常消费流水 7、提供还款接口 8、ATM记录操作日志 9、提供管理接口,包括添加账户、用户额度,冻结账户等。。。 10、用户认证用装饰器
注意:以上需求,要充分使用函数,请尽你的最大限度来减少重复代码!
流程图程序流程图(待补全)
![]()
bin ├── atm.py # atm入口 ├── __init__.py └── manage.py # 管理入口 conf ├── __init__.py └── settings.py # 配置文件 core ├── accounts.py # 账号添加、修改额度、禁用、启动接口 ├── actions.py # sql动作接口 ├── auth.py # 用户认证接口 ├── database.py # 数据库操作接口 ├── db_handler.py# 无用到 ├── __init__.py ├── logger.py # 日志接口 ├── main.py # 主接口 ├── parsers.py # sql语法解析接口 └── transaction.py # 交易接口 db ├── accounts_table # 用户账号表 └── managers_table # 管理员账号表 log ├── access.log #访问日志 └── transactions.log #交易日志程序主体
github链接
atm.py
#!_*_coding:utf-8_*_ import os import sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(base_dir) sys.path.append(base_dir) from core import main if __name__ == "__main__": main.run("atm")
manage.py
#!_*_coding:utf-8_*_ import os import sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(base_dir) sys.path.append(base_dir) from core import main if __name__ == "__main__": main.run("manage")
setting.py
#!_*_coding:utf-8_*_ #__author__:"Alex Li" import os import sys import logging BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Database title summary TITLE = ["id","name","age","phone","dept","enroll_date","expire_date","account","password","credit","balance","status","pay_day"] # Account database setting DATABASE = { "engine": "file_storage", # support mysql, postgresql in the future "name":"accounts_table", "path": "%s/db/" % BASE_DIR } # Manager account database setting MANAGE_DATABASE = { "engine": "file_storage", # support mysql, postgresql in the future "name":"managers_table", "path": "%s/db/" % BASE_DIR } # logger setting LOG_LEVEL = logging.INFO LOG_TYPES = { "transaction": "transactions.log", "access": "access.log", } # Transaction setting TRANSACTION_TYPE = { "repay":{"action":"plus", "interest":0}, "withdraw":{"action":"minus", "interest":0.05}, "transfer":{"action":"minus", "interest":0.05}, "consume":{"action":"minus", "interest":0}, } ACCOUNT_DEFAULT = { "credit": 15000.0 }
accounts.py
#!_*_coding:utf-8_*_ import json import time from core import database from conf import settings from core import parsers from core import actions def load_accounts(account): """ Check account whether exists in a database :param account: :return: """ base_dir = settings.DATABASE["path"] sql_str = "select * from accounts_table where account = %s" % account sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) res = actions.actions(sql_type, dict_sql) if not res: return False else: return True def change_account(account, set_str): """ Change account data :param account: :param set_str: :return: """ base_dir = settings.DATABASE["path"] sql_str = "update accounts_table set %s where account = %s" % (set_str, account) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql) def add_account(*args): """ Add an new account :param args: :param kwargs: :return: """ base_dir = settings.DATABASE["path"] sql_str = "add to accounts_table values %s" % (",".join(args)) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql)
actions.py
# -*- coding: utf-8 -*- from core import database from conf import settings import re def actions(sql_type,dict_sql): """ sql操作主函数 :param sql_type: sql语句的类型 :return: actions_dict[sql_type] 相应操作的函数 """ actions_dict = {"select": select_action, "add": add_action, "del": del_action, "update": update_action} if sql_type in actions_dict: # 判断导入的sql类型是否在actions_dict字典中定义。 return actions_dict[sql_type](dict_sql) else: return False def select_action(dict_sql): temp_list = [] info = dict_sql["select"] data = database.read_db(dict_sql["from"]) # 获取原始数据库文件中的所有数据,data为列表格式 key = dict_sql["where"][0] # 获取sql语句中where语句的key值。如id = 1,获取id count = 0 for values in data: # 读取data列表中的每一个元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = """ + str(values[key]) + """ dict_sql["where"][0] = value # 将values[key]的值取出并重新赋值为sql语句的key值。 print(where_action(dict_sql["where"])) if where_action(dict_sql["where"]): # 将新的where语句,发送给where_action语句进行bool判断。 count += 1 temp_list.append(values) return temp_list def add_action(dict_sql): """ 插入动作 获取用户输入的values,并在表中插入 :param dict_sql: parsers函数处理后的字典格式的sql语句 """ data = database.read_db(dict_sql["to"]) # 获取原始数据库文件中的所有数据 value = dict_sql["values"] # 从dict_sql中获取values的列表 t_id = str(int(data[-1]["id"]) + 1) # 获取原始数据库文件中id列最后一行的id数值,并每次自动+1。然后转换为字符串格式 value.insert(0, t_id) # 将添加的id插入到value变量中 if len(value) != len(settings.TITLE): # 判断输入值得长度是否等于数据库文件中定义的列的长度 print("列数不正确") else: data.append(dict(zip(settings.TITLE, value))) # 在获取的原始数据中插入行的数据 database.write_db(dict_sql["to"], data) def del_action(dict_sql): """ 删除动作函数 :param dict_sql: parsers函数处理后的字典格式的sql语句 """ temp_list = [] data = database.read_db(dict_sql["from"]) # 获取原始数据库文件中的所有数据,data为列表格式 key = dict_sql["where"][0] # 获取sql语句中where语句的key值。如id = 1,获取id for values in data: # 读取data列表中的每一个元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = """ + str(values[key]) + """ dict_sql["where"][0] = value # 将values[key]的值取出并重新赋值为sql语句的key值。 if where_action(dict_sql["where"]): # 将新的where语句,发送给where_action语句进行bool判断。 temp_list.append(values) # 如果符合条件,就从data中移除对应的values return temp_list # print("已删除%s条记录" % len(temp_list)) # for i in temp_list: # data.remove(i) # write_db(dict_sql["from"], data) # 将新生成的data重新写入文件 def update_action(dict_sql): """ 更新动作函数 :param dict_sql: parsers函数处理后的字典格式的sql语句 """ data = database.read_db(dict_sql["update"]) # 获取原始数据库文件中的所有数据,data为列表格式 key = dict_sql["where"][0] # 获取sql语句中where语句的key值。如id = 1,获取id set_key = dict_sql["set"][0] # 获取set语句中用户输入的key set_value = dict_sql["set"][2].strip(""").strip(""") # 获取set语句中用户输入的value count = 0 for values in data: # 读取data列表中的每一个元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = """ + str(values[key]) + """ dict_sql["where"][0] = value # 将values[key]的值取出并重新赋值为sql语句的key值。 if where_action(dict_sql["where"]): # 将新的where语句,发送给where_action语句进行bool判断。 count += 1 values[set_key] = set_value # 如果符合条件,使用将set_key的值修改为set_value print(data) print("已更新%s条记录" % count) database.write_db(dict_sql["update"], data) # 将新生成的data重新写入文件 def where_action(condition): """ where语句操作函数 :param condition: 判断语句。就是字典中where的值 :return: """ if "like" in condition: # 如果like在语句中 # 将where语句中的第二个参数和,第一个参数进行正则比较。如果执行正常就返回True return re.search(condition[2].strip(""").strip("""), condition[0]) and True else: return eval(" ".join(condition)) # 除此使用eval进行python的逻辑判断
auth.py
#!_*_coding:utf-8_*_ import os from core import parsers from core import actions from core import db_handler from conf import settings from core import logger import json import time def login_required(func): "验证用户是否登录" def wrapper(*args, **kwargs): print(args, kwargs) # print("--wrapper--->",args,kwargs) if args[0].get("is_authenticated"): return func(*args, **kwargs) else: exit("User is not authenticated.") return wrapper def acc_auth(account, password, type): """ 优化版认证接口 :param account: credit account number :param password: credit card password :return: if passed the authentication , retun the account object, otherwise ,return None """ # table = None # base_dir = None if type == "atm": base_dir = settings.DATABASE["path"] table = settings.DATABASE["name"] elif type == "manage": base_dir = settings.MANAGE_DATABASE["path"] table = settings.MANAGE_DATABASE["name"] sql_str = "select * from %s where account = %s" % (table, account) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) print(dict_sql) res = actions.actions(sql_type, dict_sql) print(res) if not res: print(" 33[31;1mAccount ID and password is incorrect! 33[0m") elif res[0]["password"] == password: print("haha") exp_time_stamp = time.mktime(time.strptime(res[0]["expire_date"], "%Y-%m-%d")) if time.time() > exp_time_stamp: print(" 33[31;1mAccount [%s] has expired,please contact the back to get a new card! 33[0m" % account) elif res[0]["status"] == 1: print(" 33[31;1mAccount [%s] has expired,please contact the back to get a new card! 33[0m" % account) else: # passed the authentication return res[0] else: print(" 33[31;1mAccount ID and password is incorrect! 33[0m") def acc_login(user_data, log_obj, type): """ account login func :user_data: user info data , only saves in memory :return: """ retry_count = 0 while user_data["is_authenticated"] is not True and retry_count < 3 : account = input(" 33[32;1maccount: 33[0m").strip() password = input(" 33[32;1mpassword: 33[0m").strip() auth = acc_auth(account, password, type) if auth: # not None means passed the authentication log_obj.info("account [%s] login system" % account) user_data["is_authenticated"] = True user_data["account_id"] = account return auth retry_count +=1 else: log_obj.error("account [%s] too many login attempts" % account) exit() def acc_logout(user_data, log_obj): account = user_data["account_data"]["name"] user_data["is_authenticated"] = False log_obj.info("account [%s] logout system" % account) exit("account [%s] logout system" % account)
databaase.py
# -*- coding: utf-8 -*- from conf import settings def read_db(table): """ 读取表文件函数。 :param table: 表文件参数 :return: 返回一个包含表文件内容的字典 """ title = settings.TITLE try: main_list = [] with open(table, "r", encoding="utf-8") as rf: for line in rf: temp_list = [] if line.rstrip(" ").split(",") == title: continue else: for values in line.strip(" ").split(","): if values.isdigit(): temp_list.append(int(values)) else: temp_list.append(values) main_list.append(dict(zip(title, temp_list))) return main_list except FileNotFoundError as e: print(e) exit(1) def write_db(table, data): """ 写入表文件函数。 :param table: 表文件参数 :param data: 导入的数据。为字典格式 """ value2 = ",".join(settings.TITLE) + " " for values in data: temp_list = [] for value in values.values(): temp_list.append(str(value)) value2 += ",".join(temp_list) + " " with open(file=table, mode="w", encoding="utf-8") as wf: wf.write(value2) def print_info(info, **kwargs): """ 打印函数。 用于select语句打印显示 :param info: select语句中需要显示的类 :param kwargs: 字典,用于进行操作的原始数据 :return: """ temp_list = [] if info == "*": for key in kwargs: temp_list.append(str(kwargs[key])) print(",".join(temp_list)) else: info_list = info.split(",") for i in info_list: temp_list.append(str(kwargs[i])) print(",".join(temp_list))
logger.py
#!_*_coding:utf-8_*_ """ handle all the logging works """ import logging from conf import settings import time import re def logger(log_type): # create logger my_logger = logging.getLogger(log_type) my_logger.setLevel(settings.LOG_LEVEL) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(settings.LOG_LEVEL) # create file handler and set level to warning log_file = "%s/log/%s" % (settings.BASE_DIR, settings.LOG_TYPES[log_type]) fh = logging.FileHandler(log_file) fh.setLevel(settings.LOG_LEVEL) # create formatter formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") # add formatter to ch and fh ch.setFormatter(formatter) fh.setFormatter(formatter) # add ch and fh to logger my_logger.addHandler(ch) my_logger.addHandler(fh) return my_logger def get_log_info(account): """ 将日志的内容进行转换后返回相应账号的转款信息 :param account: 账号参数 :return: """ temp_list = [] log_file = "%s/log/%s" % (settings.BASE_DIR, settings.LOG_TYPES["transaction"]) with open(log_file, "r") as f: for i in f: log_mat = re.search("(d{4}-d{1,2}-d{1,2}sd{1,2}:d{1,2}:d{1,2}).*account:(.*?)s.*action:(.*?)s.*amount:(.*?)s.*interest:(.*)",i) datetime = time.strptime(log_mat.group(1),"%Y-%m-%d %H:%M:%S") account_id = log_mat.group(2) action = log_mat.group(3) amount = log_mat.group(4) interest = log_mat.group(5) if account_id == account: temp_list.append([datetime,action,amount,interest]) return temp_list
main.py
#!_*_coding:utf-8_*_ """" main program handle module , handle all the user interaction stuff """ from core import auth from core import logger from core import parsers from core import transaction from core.auth import login_required from core import actions from conf import settings from core import accounts import random import datetime import re import time # transaction logger trans_logger = logger.logger("transaction") # access logger access_logger = logger.logger("access") # temp account data ,only saves the data in memory user_data = { "account_id": None, "is_authenticated": False, "account_data": None } @login_required def account_info(acc_data): """ print account Information :param acc_data: account summary :return: """ back_flag = False info_temp_list = [] account_data = acc_data["account_data"] info_list = ["name", "age", "phone", "enroll_date", "expire_date", "account", "credit", "balance"] for i in info_list: info_temp_list.append(str(account_data[i])) info = """ --------- BALANCE INFO -------- username : {0} age: {1} phone: {2} enroll date: {3} expire date: {4} card number: {5} credit: {6} balance: {7} """.format(*info_temp_list) print(info) while not back_flag: input_b = input(" 33[33;1mInput "b" return to menu: 33[0m").strip() if input_b == "b": back_flag = True @login_required def repay(acc_data): """ print current balance and let user repay the bill :return: """ account_data = acc_data["account_data"] current_balance = """ --------- BALANCE INFO -------- Credit : %s Balance: %s""".format(account_data["credit"], account_data["balance"]) print(current_balance) back_flag = False while not back_flag: repay_amount = input(" 33[33;1mInput repay amount: 33[0m").strip() if len(repay_amount) > 0 and repay_amount.isdigit(): new_balance = transaction.make_transaction(trans_logger, account_data, "repay", repay_amount) if new_balance: print(""" 33[42;1mNew Balance:%s 33[0m""" % (new_balance["balance"])) elif repay_amount == "b": back_flag = True else: print(" 33[31;1m[%s] is not a valid amount, only accept integer! 33[0m" % repay_amount) @login_required def withdraw(acc_data): """ print current balance and let user do the withdraw action :param acc_data: :return: """ account_data = acc_data["account_data"] current_balance = """ --------- BALANCE INFO -------- Credit : %s Balance: %s""" % (account_data["credit"], account_data["balance"]) print(current_balance) back_flag = False while not back_flag: withdraw_amount = input(" 33[33;1mInput withdraw amount: 33[0m").strip() if len(withdraw_amount) > 0 and withdraw_amount.isdigit(): new_balance = transaction.make_transaction(trans_logger, account_data, "withdraw", withdraw_amount) if new_balance: print(""" 33[42;1mNew Balance:%s 33[0m""" % (new_balance["balance"])) elif withdraw_amount == "b": back_flag = True else: print(" 33[31;1m[%s] is not a valid amount, only accept integer! 33[0m" % withdraw_amount) @login_required def transfer(acc_data): """ transfer accounts :param acc_data: :return: """ account_data = acc_data["account_data"] current_balance = """ --------- BALANCE INFO -------- Credit : %s Balance: %s""" % (account_data["credit"], account_data["balance"]) print(current_balance) back_flag = False while not back_flag: payee_account = input(" 33[33;1mInput payee account: 33[0m").strip() if payee_account == "b": back_flag = True else: base_dir = settings.DATABASE["path"] sql_str = "select * from accounts_table where account = %s" % payee_account sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) res = actions.actions(sql_type, dict_sql) if not res: print(" 33[31;1mThe payee you entered is not a bank user! 33[0m") else: payee_account_data = res[0] trans_amount = input(" 33[33;1mInput transfer amount: 33[0m").strip() if len(trans_amount) > 0 and trans_amount.isdigit(): new_balance = transaction.make_transaction(trans_logger, account_data, "transfer", trans_amount) payee_balance = transaction.make_transaction(trans_logger, payee_account_data, "repay", trans_amount) if new_balance: print(""" 33[42;1mNew Balance:%s 33[0m""" % (new_balance["balance"])) if payee_balance: print(""" 33[42;1mThe money has come to the payee [%s] 33[0m""" % payee_account) elif trans_amount == "b": back_flag = True else: print(" 33[31;1m[%s] is not a valid amount, only accept integer! 33[0m" % trans_amount) @login_required def pay_check(acc_data): """ Account pay check interface :param acc_data: :return: """ back_flag = False account_id = acc_data["account_id"] local_month = time.localtime().tm_mon res = logger.get_log_info(account_id) if res: for result in res: if result[0].tm_mon == local_month: pay_check_info = """ --------- Datatime %s -------- Action: %s Amount: %s Interest: %s""" % (time.strftime("%Y-%m-%d %H:%M:%S",result[0]), result[1], result[2], result[3]) print(pay_check_info) while not back_flag: input_b = input(" 33[33;1mInput "b" return to menu: 33[0m").strip() if input_b == "b": back_flag = True @login_required def logout(acc_data): auth.acc_logout(acc_data, access_logger) def interactive(acc_data): """ Atm interactive main interface :param acc_data: account summary :return: """ menu = u""" ------- Oldboy Bank --------- 33[32;1m1. 账户信息(功能已实现) 2. 还款(功能已实现) 3. 取款(功能已实现) 4. 转账(功能已实现) 5. 账单(功能已实现) 6. 退出(功能已实现) 33[0m""" menu_dic = { "1": account_info, "2": repay, "3": withdraw, "4": transfer, "5": pay_check, "6": logout, } exit_flag = False while not exit_flag: print(menu) user_option = input(">>:").strip() if user_option in menu_dic: menu_dic[user_option](acc_data) else: print(" 33[31;1mOption does not exist! 33[0m") @login_required def add_account(acc_data): exit_flag = False # id, name, age, phone, dept, enroll_date, expire_date, account, password, credit, balance, status, pay_day while not exit_flag: account_name = input(" 33[33;1mInput user name: 33[0m").strip() if len(account_name) > 0: account_name = account_name else: continue account_age = input(" 33[33;1mInput user age: 33[0m").strip() if len(account_name) > 0 and account_age.isdigit(): account_age = account_age else: continue account_phone = input(" 33[33;1mInput user phone number: 33[0m").strip() if len(account_phone) > 0 and account_phone.isdigit() and len(account_phone) == 11: account_phone = account_phone else: continue account_dept = input(" 33[33;1mInput user dept: 33[0m").strip() if len(account_dept) > 0: account_dept = account_dept else: continue account = "".join(str(random.choice(range(10))) for _ in range(5)) # 随机生成5位账号 password = input(" 33[33;1mInput account password: 33[0m").strip() if len(password) == 0: password = "abcde" else: password = password account_enroll_date = datetime.datetime.now().strftime("%Y-%m-%d") # 当前时间为开通时间 account_expire_date = (datetime.datetime.now() + datetime.timedelta(days=(3 * 365))).strftime("%Y-%m-%d") # 3年后为过期时间 #print(account_enroll_date,account_expire_date) account_credit = input(" 33[33;1mInput account credit: 33[0m").strip() if len(account_credit) == 0: account_credit = str(settings.ACCOUNT_DEFAULT["credit"]) else: account_credit = account_credit #print(account) input_list = [account_name, account_age,account_phone,account_dept,account_enroll_date,account_expire_date,account,password,account_credit,account_credit,"0","22"] print(input_list) commit = input(" 33[33;1mCommit account or exit:(Y/N) 33[0m").strip().upper() if commit == "Y": accounts.add_account(*input_list) exit_flag = True else: exit_flag = True @login_required def change_credit(acc_data): """ Change account credit amount :param acc_data: :return: """ exit_flag = False while not exit_flag: dis_account = input(" 33[33;1mInput account: 33[0m").strip() if dis_account == "b": exit_flag = True elif len(dis_account) > 0: if accounts.load_accounts(dis_account): new_credits = input(" 33[33;1mInput new credits amount: 33[0m").strip() if len(new_credits) > 0 and new_credits.isdigit(): accounts.change_account(dis_account, "credit = %s" % new_credits) else: print(" 33[31;1m[%s] is not a valid amount, only accept integer! 33[0m" % new_credits) else: print(" 33[31;1mThe account you entered is not a bank user! 33[0m") @login_required def disable_account(acc_data): """ Disable account :return: """ exit_flag = False while not exit_flag: dis_account = input(" 33[33;1mInput account: 33[0m").strip() if dis_account == "b": exit_flag = True elif len(dis_account) > 0: if accounts.load_accounts(dis_account): accounts.change_account(dis_account, "status = 1") else: print(" 33[31;1mThe account you entered is not a bank user! 33[0m") @login_required def enable_account(acc_data): """ Enable account :return: """ exit_flag = False while not exit_flag: dis_account = input(" 33[33;1mInput account: 33[0m").strip() if dis_account == "b": exit_flag = True elif len(dis_account) > 0: if accounts.load_accounts(dis_account): accounts.change_account(dis_account, "status = 0") else: print(" 33[31;1mThe account you entered is not a bank user! 33[0m") def mg_interactive(acc_data): """ Mange interactive main interface :param acc_data: 返回的用户账号的具体信息 :return: """ menu = u""" ------- Oldboy Bank --------- 33[32;1m1. 添加账号(功能已实现) 2. 修改额度(功能已实现) 3. 禁用账号(功能已实现) 4. 启用账号(功能已实现) 5. 退出(功能已实现) 33[0m""" menu_dic = { "1": add_account, "2": change_credit, "3": disable_account, "4": enable_account, "5": logout, } exit_flag = False while not exit_flag: print(menu) user_option = input(">>:").strip() if user_option in menu_dic: menu_dic[user_option](acc_data) else: print(" 33[31;1mOption does not exist! 33[0m") def run(type): """ this function will be called right a way when the program started, here handles the user interaction stuff :return: """ if type == "atm": acc_data = auth.acc_login(user_data, access_logger, "atm") if user_data["is_authenticated"]: user_data["account_data"] = acc_data interactive(user_data) elif type == "manage": acc_data = auth.acc_login(user_data, access_logger, "manage") if user_data["is_authenticated"]: user_data["account_data"] = acc_data mg_interactive(user_data)
parsers.py
# -*- coding: utf-8 -*- import re #from .help import help def parsers(sql_str, sql_type, base_dir): """ 语法解析函数 :param sql_type: 从main()函数导入的sql语句类型。 :return: parsers_dict[sql_type] 相应的语法解析函数 """ parsers_dict = {"select": select_parser, "add": add_parser, "del": del_parser, "update": update_parser} if sql_type in parsers_dict: return parsers_dict[sql_type](sql_str, sql_type, base_dir) else: return False def select_parser(sql_str, sql_type, base_dir): """ 搜索语句解析函数 :param sql_str: 用户输入的sql语句 :param sql_type: 用户输入的sql语句类型 :param base_dir: 主函数导入的数据库所在路径 :return: """ dict_sql = {} # 创建空字典 command_parse = re.search(r"selects(.*?)sfroms(.*?)swheres(.*)", sql_str, re.I) # 使用正则表达式解析add语法,并且re.I忽略大小写 if command_parse: dict_sql["select"] = command_parse.group(1) dict_sql["from"] = base_dir + command_parse.group(2) # sql字典"from’键添加数据库表文件路径的值 dict_sql["where"] = command_parse.group(3) # sql字典‘where’键添加插入的值 if logic_cal(dict_sql["where"]): # 使用logic_cal函数将where语句语法再次进行解析 dict_sql["where"] = logic_cal(dict_sql["where"]) # 如解析有返回值,将返回值重新作为dict_sql["where"]的值 return dict_sql else: print(help(sql_type)) # 当语法解析不正常答应帮助 else: print(help(sql_type)) # 当语法解析不正常答应帮助 def add_parser(sql_str, sql_type, base_dir): """ 添加语句解析函数 :param sql_str: 用户输入的sql语句 :param sql_type: 用户输入的sql语句类型 :param base_dir: 主函数导入的数据库所在路径 :return: dict_sql 解析后的字典格式sql语句 """ dict_sql = {} command_parse = re.search(r"addstos(.*?)svaluess(.*)", sql_str, re.I) # 使用正则表达式解析add语法,并且re.I忽略大小写 if command_parse: dict_sql["to"] = base_dir + command_parse.group(1) # sql字典"to’键添加数据库表文件路径的值 dict_sql["values"] = command_parse.group(2).split(",") # sql字典‘values’键添加插入的值 return dict_sql else: print(help(sql_type)) # 当语法解析不正常答应帮助 def del_parser(sql_str, sql_type, base_dir): """ 删除语句解析函数 :param sql_str: 用户输入的sql语句 :param sql_type: 用户输入的sql语句类型 :param base_dir: 主函数导入的数据库所在路径 :return: dict_sql 解析后的字典格式sql语句 """ dict_sql = {} command_parse = re.search(r"delsfroms(.*?)swheres(.*)", sql_str, re.I) if command_parse: dict_sql["from"] = base_dir + command_parse.group(1) # sql字典"to’键添加数据库表文件路径的值 dict_sql["where"] = command_parse.group(2) # sql字典‘where’键添加插入的值 if logic_cal(dict_sql["where"]): # 使用logic_cal函数将where语句语法再次进行解析 dict_sql["where"] = logic_cal(dict_sql["where"]) # 如解析有返回值,将返回值重新作为dict_sql["where"]的值 return dict_sql else: print(help(sql_type)) # 当语法解析不正常答应帮助 else: print(help(sql_type)) # 当语法解析不正常答应帮助 def update_parser(sql_str, sql_type, base_dir): """ 更新语句解析函数 :param sql_str: 用户输入的sql语句 :param sql_type: 用户输入的sql语句类型 :param base_dir: 主函数导入的数据库所在路径 :return: dict_sql 解析后的字典格式sql语句 """ dict_sql = {} command_parse = re.search(r"updates(.*?)ssets(.*?)=(.*?)swheres(.*)", sql_str, re.I) if command_parse: dict_sql["update"] = base_dir + command_parse.group(1) # sql字典"to’键添加数据库表文件路径的值 dict_sql["set"] = [command_parse.group(2), "=", command_parse.group(3)] # sql字典‘where’键添加插入的值 dict_sql["where"] = command_parse.group(4) if logic_cal(dict_sql["where"]) and logic_cal(dict_sql["set"]): # 如果where语句、set语句都符合logic_cal中定义的规范 dict_sql["where"] = logic_cal(dict_sql["where"]) # 如解析有返回值,将返回值重新作为dict_sql["where"]的值 dict_sql["set"] = logic_cal(dict_sql["set"]) # 如解析有返回值,将返回值重新作为dict_sql["set"]的值 return dict_sql else: print(help(sql_type)) # 当语法解析不正常答应帮助 else: print(help(sql_type)) # 当语法解析不正常答应帮助 def logic_cal(logic_exp): """ 逻辑函数 :param logic_exp: sql语句中和逻辑判断相关的语句,列表格式。如[‘age",">=",20] 或 [‘dept","like","HR"] :return: logic_exp 经过语法解析后的逻辑判断语句。列表格式。如[‘age","==",20] 或 [‘dept","like","HR"] """ # 表达式列表优化成三个元素,形如[‘age",">=",20] 或 [‘dept","like","HR"] logic_exp = re.search("(.+?)s([=<>]{1,2}|like)s(.+)", "".join(logic_exp)) if logic_exp: logic_exp = list(logic_exp. group(1, 2, 3)) # 取得re匹配的所有值,并作为一个列表 if logic_exp[1] == "=": logic_exp[1] = "==" # 判断逻辑运算的比较符号后的值是否字母,并且用户是否输入了双引号。如没有输入手工添加上双引号。 if not logic_exp[2].isdigit() and not re.search(""(.*?)"", logic_exp[2]): logic_exp[2] = """ + logic_exp[2] + """ return logic_exp else: return False
transaction.py
#!_*_coding:utf-8_*_ from conf import settings from core import accounts from core import logger from core import parsers from core import actions #transaction logger def make_transaction(log_obj, account_data, tran_type, amount, **others): """ deal all the user transactions :param account_data: user account data :param tran_type: transaction type :param amount: transaction amount :param others: mainly for logging usage :return: """ amount = float(amount) if tran_type in settings.TRANSACTION_TYPE: interest = amount * settings.TRANSACTION_TYPE[tran_type]["interest"] old_balance = float(account_data["balance"]) if settings.TRANSACTION_TYPE[tran_type]["action"] == "plus": new_balance = old_balance + amount + interest elif settings.TRANSACTION_TYPE[tran_type]["action"] == "minus": new_balance = old_balance - amount - interest #check credit if new_balance <0: print(""" 33[31;1mYour credit [%s] is not enough for this transaction [-%s], your current balance is [%s]""" %(account_data["credit"],(amount + interest), old_balance )) return account_data["balance"] = new_balance base_dir = settings.DATABASE["path"] sql_str = "update accounts_table set balance = %s where account = %s" % (new_balance, account_data["account"]) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql) # accounts.dump_account(account_data) # save the new balance back to file log_obj.info("account:%s action:%s amount:%s interest:%s" % (account_data["account"], tran_type, amount,interest) ) return account_data else: print(" 33[31;1mTransaction type [%s] is not exist! 33[0m" % tran_type)启动命令
启动命令。
python atm.py python manage.py发布信息
- 作者:henryyuan - 日期:2018/03/05 - 版本:Version 1.0 - 工具:PyCharm 2017.3.3 - 版本:Python 3.6.4 - MarkDown工具:pycharm - 流程图工具:ProcessOn新闻
无历史记录
2018-3-5 Version:1.0遇到的问题:
1.) 在main.py接口中的add_account()函数。用于添加账号。需要输入多个input条件。代码如下:
account_name = input(" 33[33;1mInput user name: 33[0m").strip() # 有点重复代码的感觉 if len(account_name) > 0: account_name = account_name else: continue account_age = input(" 33[33;1mInput user age: 33[0m").strip() # 有点重复代码的感觉 if len(account_name) > 0 and account_age.isdigit(): account_age = account_age else: continue account_phone = input(" 33[33;1mInput user phone number: 33[0m").strip() # 有点重复代码的感觉 if len(account_phone) > 0 and account_phone.isdigit() and len(account_phone) == 11: account_phone = account_phone else: continue
每个input语句都需要有if的判断,但当有多个input输入语句时,就会出现过多的重复的if代码。如何减少if语句的代码量。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/44551.html
摘要:利用开发个小型商城我们本期的教程是教大家如何利用开发一个小型的商城这里所说的小型商城只是功能上的简朴。并于年月在许可证下发布。这套框架是以比利时的吉普赛爵士吉他手来命名的。是重量级选手中最有代表性的一位。 利用Django开发个小型商城 我们本期的教程是教大家如何利用Django开发一个小型的商城,这里所说的小型商城只是功能上的简朴。 作者:黄志成(小黄) 作者博客:博客地址 前提 1...
摘要:将数据回写到数据库文件中。通过调用函数的方式记录到数据库中导入登录的用户账号导入购物的记录获取历史的购物记录该函数调用函数和函数。您选购的商品并未在我们的货架中除此,获取商品的售价添加历史购买记录调用函数计算余额调用函数打印用户的当前余额。 程序要求 1、启动程序后,输入用户名密码后,让用户输入工资,然后打印商品列表2、允许用户根据商品编号购买商品3、用户选择商品后,检测余额是否够,够...
摘要:如果你仍然无法抉择,那请选择,毕竟这是未来的趋势,参考知乎回答还是编辑器该如何选我推荐社区版,配置简单功能强大使用起来省时省心,对初学者友好。 这是一篇 Python 入门指南,针对那些没有任何编程经验,从零开始学习 Python 的同学。不管你学习的出发点是兴趣驱动、拓展思维,还是工作需要、想要转行,都可以此文作为一个参考。 在这个信息爆炸的时代,以 Python入门 为关键字搜索出...
摘要:生成器生成器是迭代器,但是只能迭代一次,生成器不会将所有值存储在内存中,而是实时的生成这些值看上去除了用替换了原来的外,它们没什么不同。 这是stackoverflow上一个关于python中yield用法的帖子,这里翻译自投票最高的一个回答,原文链接 here 问题 Python中yield关键字的用途是什么?它有什么作用?例如,我试图理解以下代码 ¹: def _get_c...
摘要:以下这些项目,你拿来学习学习练练手。当你每个步骤都能做到很优秀的时候,你应该考虑如何组合这四个步骤,使你的爬虫达到效率最高,也就是所谓的爬虫策略问题,爬虫策略学习不是一朝一夕的事情,建议多看看一些比较优秀的爬虫的设计方案,比如说。 (一)如何学习Python 学习Python大致可以分为以下几个阶段: 1.刚上手的时候肯定是先过一遍Python最基本的知识,比如说:变量、数据结构、语法...
阅读 2910·2020-01-08 12:17
阅读 1959·2019-08-30 15:54
阅读 1121·2019-08-30 15:52
阅读 1997·2019-08-29 17:18
阅读 1009·2019-08-29 15:34
阅读 2423·2019-08-27 10:58
阅读 1837·2019-08-26 12:24
阅读 342·2019-08-23 18:23