Toggle navigation
首页
技术
骑行
羽毛球
资讯
联络我
登录
python性能调优之key-query
2022-12-03
python
mongo
> 近期接触到一个性能调优的问题,需要在 mongo db 中比对大约 100G 的 csv 文件的 key 在 mongo db 中是否存在 ## base line 首先做一个什么优化都没有的情况下的基准测试: ```python batch_size = 10**4*3 find_count = 0 total = 0 exception_count = 0 begin = datetime.now() max_size = 10**6*5 for chunk in pd.read_csv(file, header = None, chunksize=batch_size, names = ['key']): total += batch_size try: find_count += mycol.count_documents({'Key': {'$in': chunk.key.tolist()}}) print('size: {:,}, find: {}, time: {}'.format(total, find_count, datetime.now() - begin)) except: exception_count += 1 if total > max_size: break print('size: {:,}, batch_size: {:,}, time: {}'.format(total, batch_size, datetime.now() - begin)) ``` size: 5,000,000, batch_size: 30,000, time: 0:00:44 500 万的数据,耗时 44 秒 ## 优化一:建立索引 给 Key 栏位建立 hashed 索引: ```json { "Key": "hashed" } ``` size: 5,000,000, batch_size: 30,000, time: 0:00:34 耗时 34 秒 ## 优化二:启用 network compression pip install zstandard ```python import pymongo import urllib myclient = pymongo.MongoClient('mongodb://{}:{}@localhost/db?authSource=admin'.format(urllib.parse.quote('root'), urllib.parse.quote('example')), compressors='zstd') ``` size: 5,000,000, batch_size: 30,000, time: 0:00:20 耗时 20 秒 ## 优化三:同时使用 index 和 network compression size: 5,000,000, batch_size: 30,000, time: 0:00:09 ## 优化四:使用多线程 使用多线程,将数据读取和数据查询分开进行,提升并发效率 ```python import pandas as pd from queue import Queue from threading import Thread, Lock my_queue = Queue(maxsize=10) num_threads = 2 lock = Lock() def do_stuff(id, q, mycol): global find_count global exception_count while True: try: count = mycol.count_documents({'Key': {'$in': q.get()}}) with lock: find_count += count print('id: {}, size: {:,}, find: {}, now: {}'.format(id, total, find_count, datetime.now() - begin)) except: exception_count += 1 print('size exception: {:,}, find: {}, now: {}'.format(batch_size, find_count, datetime.now() - begin)) q.task_done() for i in range(num_threads): worker = Thread(target=do_stuff, args=(i, my_queue, mycol)) worker.setDaemon(True) worker.start() for chunk in pd.read_csv(file, header = None, chunksize = batch_size, names = ['key']): total += batch_size my_queue.put(chunk.key.tolist()) if total > max_size: break my_queue.join() print('size: {:,}, batch_size: {:,}, find: {}, exception: {}, now: {}'.format(total, batch_size, find_count, exception_count, datetime.now() - begin)) ``` size: 5,000,000, batch_size: 30,000, time: 0:00:05 耗时 5 秒 ## 总结 | no optimization | index | network compression | multiple thread | result | | | :-------------: | :---: | :-----------------: | :-------------: | :-----: | :---: | | v | | | | 0:00:45 | | | | v | | | 0:00:34 | 1.32x | | | | v | | 0:00:20 | 2.25x | | | v | v | | 0:00:09 | 5x | | | v | v | v | 0:00:06 | 7.5x | | | | | | | | ## 参考 * [Mongo index](https://www.mongodb.com/docs/manual/indexes/) * [MongoDB Network Compression: A Win-Win](https://www.mongodb.com/developer/products/mongodb/mongodb-network-compression/)
×
本文为博主原创,如需转载,请注明出处:
http://www.supperxin.com
返回博客列表