py并发编程实践-demo
需求
已知条件:appX -请求-> api
多进程实现并发请求api
- 给定app应用列表,请求api核数
from datetime import datetime, timedelta
from multiprocessing import Processclass ProcessTest(object):"""多进程并发请求API,并批量写入django表要点:1)并发;2)读写批量原则,批量读、批量写需求:已知1000个app,通过api获取其CPU核数思路:将app列表 按并发数 分段"""def __init__(self, mon_day):self.mon_day = mon_day@staticmethoddef requests_mon_api(app_id):import randomreturn {app_id: random.randint(100, 5000)}@staticmethoddef get_app_list():import timetime.sleep(2) # 耗时return ["app_"+str(i) for i in range(1000)]def records_to_db(self, records):# django table bulk create to dbprint("[{0}] -------->>>>>>>>>{1}".format(self.mon_day, records))def app_cores_to_db(self, app_id):# api 无限重试。。flag = 0while flag == 0:try:app_records = self.requests_mon_api(app_id)self.records_to_db(app_records)flag = 1except Exception as e:print(e.args, "retry", app_id)def batch_run(self, start, end, app_arr):batch_app = app_arr[start:end + 1]for app in batch_app:self.app_cores_to_db(app)def process_run(self, process_num, process_batch, app_arr):process_arr = []# from django import dbfor i in range(process_num):# db.close_old_connections()p = Process(target=self.batch_run, args=(i * process_batch, (i+1)*process_batch, app_arr))print("第{0}个进程,拉取范围[{1}:{2}],共拉取{3}条记录".format(i+1, i*process_batch, (i+1)*process_batch, process_batch))process_arr.append(p)for p in process_arr:p.start()for p in process_arr:p.join()def to_db(self):app_arr = self.get_app_list()process_num = 15total = len(app_arr)process_batch = total // process_numself.process_run(process_num=process_num, process_batch=process_batch, app_arr=app_arr)remain_index = process_batch * process_num + 1for app_id in app_arr[remain_index:]:try:self.app_cores_to_db(app_id)except Exception as e:print(e.args, app_id, "error")if __name__ == '__main__':day = (datetime.now() + timedelta(days=-0)).strftime("%Y-%m-%d")tp = ProcessTest(mon_day=day)tp.to_db()