redis多线程操作
今天更新一个redis多线程操作, 可直接搬运
import redis, os, threading, queue
import pandas as pd# 创建一个任务队列
task_queue = queue.Queue()def read_excel(folder_path):total_list = []for filepath, dirnames, filenames in os.walk(folder_path):for filename in filenames:file_path = os.path.join(filepath, filename)df_total = pd.read_excel(file_path)list_df = df_total['product'].values.tolist()total_list.extend(list_df)print(total_list)print(len(total_list))result_list = []for t_pro in total_list:t_pro_dict = {t_pro: '20230907'}result_list.append(t_pro_dict)# # 写入 redis# redis_obj = RedisClass('ahrefs_filter', 9)# for t_pro in total_list:# t_pro_dict = {t_pro: '20230907'}# print(t_pro_dict)# redis_obj.insert_redis(t_pro_dict)return result_listclass RedisClass:def __init__(self, db_key, db_index, db_host='*.*.*.*', db_port=6379, db_password='password', filter_start_index=0, filter_end_index=0):# 传入DB表名,和DB序号self.db_key = db_keyself.db_index = db_indexself.db_host = db_hostself.db_port = db_portself.db_password = db_passwordself.filter_start_index = filter_start_indexself.filter_end_index = filter_end_indexself.redis_pool = redis.ConnectionPool(host=self.db_host, port=self.db_port, password=self.db_password,db=self.db_index)self.redis_conn = redis.Redis(connection_pool=self.redis_pool)def count_redis_data(self):# 计数: 获取redis中数据数量return self.redis_conn.zcard(self.db_key)def read_redis(self):# 读取redis中全部数据if self.filter_start_index == 0 and self.filter_end_index == 0:# 如果无输入查询数量, 则全表查询self.filter_end_index = self.redis_conn.zcard(self.db_key)print('查询到的数量为: {}'.format(self.filter_end_index))res_list = self.redis_conn.zrange(self.db_key, self.filter_start_index, self.filter_end_index)return [res.decode('utf-8') for res in res_list]def read_redis_by_score(self, zset_score):# 读取redis中全部数据res_list = self.redis_conn.zrangebyscore(self.db_key, zset_score, zset_score)return [res.decode('utf-8') for res in res_list]def insert_redis(self, redis_dict):flag = Falseself.redis_conn.zadd(self.db_key, redis_dict)return flag# 生产者线程类
class ProducerThread(threading.Thread):def __init__(self, mysql_pro_info):super().__init__()self.mysql_pro_info = mysql_pro_infodef run(self):for item in self.mysql_pro_info:task_queue.put(item)print(f"Produced by {self.name}: {item}")class ConsumerThread(threading.Thread):def run(self):redis_obj = RedisClass('ahrefs_filter', 9)while True:item = task_queue.get()print(item)redis_obj.insert_redis(item)if __name__ == '__main__':# 1- 读取EXCEL中的数据, 存入redisfolder_path = r'C:\Users\admin\Desktop\0905型号'total_list = read_excel(folder_path)producer_thread = ProducerThread(total_list)producer_thread.start()for i in range(100): # 创建100个消费者线程consumer_thread = ConsumerThread()consumer_thread.start()