哈希扩展 --- 海量数据处理
TopK问题
TopK问题就是很多数里面,找最大/最小的前K个数据,之前对于TopK问题,我们使用堆这个数据结构来进行解决:
- 最大的前K个数据,使用小堆(greater)
- 最小的前K个数据,使用大堆(默认)
10亿个整数中求最大的前100个
思想1:对于这个问题,我们可以使用小堆来进行解决,只需要建立100个数的小堆,数据来了,只要比堆顶的数据大就进堆,pop堆顶数据!
#include <iostream>
#include <vector>
#include <queue>
#include <fstream>
#include <sstream>using namespace std;vector<int> findTopK(int k, const string& filename) {priority_queue<int, vector<int>, greater<int>> minHeap;ifstream file(filename);if (!file.is_open()) {cerr << "Error opening file" << endl;return {};}string line;while (getline(file, line)) {istringstream iss(line);int num;while (iss >> num) {if (minHeap.size() < k) {minHeap.push(num);} else if (num > minHeap.top()) {minHeap.pop();minHeap.push(num);}}}vector<int> result(k);for (int i = k - 1; i >= 0; --i) {result[i] = minHeap.top();minHeap.pop();}return result;
}int main() {string filename = "numbers.txt"; // 假设文件名为numbers.txtint k = 100;vector<int> topK = findTopK(k, filename);cout << "Top " << k << " numbers are: ";for (int num : topK) {cout << num << " ";}cout << endl;return 0;
}
思想2:我们对于海量数据,我们可以优先选择使用位图来解决。位图是一种高效处理海量数据的方法,尤其适用于整数范围已知的情况。对于10亿个整数,如果范围在0到10亿之间,可以使用长度为10亿的位数组,每个位对应一个整数,位为1表示该整数出现过,位为0表示未出现。位图的空间复杂度为O(n),对于10亿个整数,位图大小约为1.25GB,适合现代计算机内存。求最大的前100个数时,从高位开始扫描位图,每遇到一个位为1的整数就加入结果列表,直到列表中有100个数为止。这种方法时间效率高,构建和扫描位图的时间复杂度均为O(n)。不过,位图也有局限性,如果整数范围过大或未知,位图的大小可能会超出内存限制,且在整数范围稀疏时效率会降低。
#include <iostream>
#include <vector>
#include <fstream>
#include <bitset>using namespace std;const int MAX_VALUE = 1e9; // 假设整数范围是0到10亿
const int BITMAP_SIZE = (MAX_VALUE + 1) / 8; // 位图大小(以字节为单位)vector<int> findTopKUsingBitmap(int k, const string& filename) {vector<unsigned char> bitmap(BITMAP_SIZE, 0); // 位图初始化为0ifstream file(filename);if (!file.is_open()) {cerr << "Error opening file" << endl;return {};}string line;while (getline(file, line)) {istringstream iss(line);int num;while (iss >> num) {if (num >= 0 && num <= MAX_VALUE) {bitmap[num / 8] |= (1 << (num % 8)); // 设置对应位为1}}}vector<int> result;for (int i = MAX_VALUE; i >= 0 && result.size() < k; --i) {if (bitmap[i / 8] & (1 << (i % 8))) { // 检查对应位是否为1result.push_back(i);}}return result;
}int main() {string filename = "numbers.txt"; // 假设文件名为numbers.txtint k = 100;vector<int> topK = findTopKUsingBitmap(k, filename);cout << "Top " << k << " numbers are: ";for (int num : topK) {cout << num << " ";}cout << endl;return 0;
}
位图遗留问题
给两个文件,分别有100亿个query,只有1G内存,如何找到两个文件的交集
query就是查询,更好的理解就是一个字符串:MySQL语句就是所谓的查询:
分析:假设每个query字符串平均为50字节,那么100亿个query的总大小约为5000亿字节,相当于500G(1G约等于10亿字节)。显然,哈希表、红黑树等数据结构在这种情况下是无法使用的。位图的直接使用者是整型,显然对于字符串来说无从下手!
解决方案1:可以使用布隆过滤器。将一个文件中的query放入布隆过滤器,然后依次查找另一个文件中的query。如果在布隆过滤器中存在,那么这个query就是交集的一部分。不过,布隆过滤器可能会有误判的情况,但它能保证交集一定被找到。(在是∩,不在不是∩)
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <unordered_set>
#include <murmurhash3.h>using namespace std;const int BITMAP_SIZE = 1 << 27; // 128MB
const int HASH_FUNCTIONS = 3;class BloomFilter {
private:vector<bool> bitmap;int size;public:BloomFilter(int size) : size(size), bitmap(size, false) {}void add(const string& query) {for (int i = 0; i < HASH_FUNCTIONS; ++i) {uint32_t hash = MurmurHash3_x86_32(query.c_str(), query.size(), i);bitmap[hash % size] = true;}}bool mightContain(const string& query) const {for (int i = 0; i < HASH_FUNCTIONS; ++i) {uint32_t hash = MurmurHash3_x86_32(query.c_str(), query.size(), i);if (!bitmap[hash % size]) return false;}return true;}
};vector<string> findIntersection(const string& file1, const string& file2) {BloomFilter bloomFilter(BITMAP_SIZE);unordered_set<string> intersection;ifstream file1Stream(file1);string query;while (getline(file1Stream, query)) {bloomFilter.add(query);}ifstream file2Stream(file2);while (getline(file2Stream, query)) {if (bloomFilter.mightContain(query)) {intersection.insert(query);}}return vector<string>(intersection.begin(), intersection.end());
}int main() {string file1 = "queries1.txt";string file2 = "queries2.txt";vector<string> intersection = findIntersection(file1, file2);cout << "Intersection queries: ";for (const auto& query : intersection) {cout << query << " ";}cout << endl;return 0;
}
解决方案2:(哈希切分 != 平均切分)
哈希切分:内存的访问速度远大于硬盘,因此对于大文件,可以考虑将其切分为小文件,然后放入内存处理。但不能简单地平均切分,因为这样效率仍然很低。因为A0要分别和B(0~999)进行找交集,整体下来就是的时间复杂度!
哈希切分方法:依次读取文件中的query字符串,计算i = HashFunc(query) % N
,其中N
是准备切分的小文件数量,N
的大小取决于内存能容纳的大小。将query放入第i
号小文件。这样,A和B中相同的query计算出的hash值i
是相同的,相同的query会进入编号相同的小文件。因此,可以直接对编号相同的小文件求交集,而不需要交叉查找,这样效率会大幅提升。--- 就可以理解为Ai和Bi编号相同的找交集,这样,时间复杂度就从减低为了
了!这时候就可以放入两个set中,然后在找交集了!
本质:在哈希切分过程中,相同的query一定会进入同一个文件Ai
和Bi
,而不会出现A中的query进入Ai
,而B中的相同query进入Bj
的情况(其中i
和j
是不同的整数)。因此,只需要对Ai
和Bi
求交集即可。因为我们双方使用的是同一个哈希函数!两个query是相同的话,就相当于给了哈希函数两个相同的参数,返回的结果肯定是相同的!
哈希切分的问题:每个小文件可能不是均匀切分的,可能会导致某个小文件过大,内存放不下。这种情况有两种可能:
- 这个小文件中大部分是同一个query。这种情况下,可以将其放入内存的set中,因为set会去重。
- 这个小文件由很多不同的query组成,本质是这些query发生了冲突。这种情况下,需要换一个哈希函数进行二次哈希切分。
处理方法:如果遇到大于1G的小文件,可以尝试将其放入set中求交集。如果在set插入数据时抛出异常(set插入数据抛出异常只可能是申请内存失败,不会有其他情况),那么说明内存放不下,属于情况2,此时需要换一个哈希函数进行二次哈希切分,然后再对应求交集。(先insert在判断,不需要直接就去看文件的大小,先直接无脑insert!)
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <unordered_set>
#include <unordered_map>
#include <functional>
#include <sstream>
#include <sys/stat.h>using namespace std;const int HASH_BUCKETS = 500; // 假设切分为500个小文件string hashFunction(const string& query, int bucketCount) {hash<string> hasher;return to_string(hasher(query) % bucketCount);
}void splitFile(const string& filename, int bucketCount) {unordered_map<string, ofstream> files;ifstream inputFile(filename);string query;while (getline(inputFile, query)) {string hashValue = hashFunction(query, bucketCount);if (files.find(hashValue) == files.end()) {files[hashValue].open("bucket_" + hashValue + ".txt");}files[hashValue] << query << endl;}for (auto& pair : files) {pair.second.close();}
}vector<string> findIntersection(const string& file1, const string& file2) {splitFile(file1, HASH_BUCKETS);splitFile(file2, HASH_BUCKETS);vector<string> intersection;for (int i = 0; i < HASH_BUCKETS; ++i) {string bucket1 = "bucket_" + to_string(i) + ".txt";string bucket2 = "bucket_" + to_string(i) + ".txt";unordered_set<string> set1, set2;ifstream file1Stream(bucket1);ifstream file2Stream(bucket2);string query;while (getline(file1Stream, query)) {set1.insert(query);}while (getline(file2Stream, query)) {set2.insert(query);}for (const auto& query : set1) {if (set2.find(query) != set2.end()) {intersection.push_back(query);}}remove(bucket1.c_str());remove(bucket2.c_str());}return intersection;
}int main() {string file1 = "queries1.txt";string file2 = "queries2.txt";vector<string> intersection = findIntersection(file1, file2);cout << "Intersection queries: ";for (const auto& query : intersection) {cout << query << " ";}cout << endl;return 0;
}
给一个超过100G大小的log file,log中存着ip地址,设计算法找到出现次数最多的ip地址以及出现次数前10的ip地址
本题的思路与上题类似。依次读取文件中的query,计算i = HashFunc(query) % 500
,将query放入第i
号小文件。然后,依次使用map<string, int>
对每个小文件Ai
统计ip的次数,同时求出出现次数最多的ip或者TopK ip。在哈希切分过程中,相同的ip一定会进入同一个文件Ai
,而不会出现同一个ip进入Ai
和Aj
的情况。因此,对每个Ai
进行统计次数是准确的。
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <unordered_map>
#include <queue>
#include <sstream>
#include <algorithm>using namespace std;const int HASH_BUCKETS = 500; // 假设切分为500个小文件
const int K = 10; // 找出现次数最多的前10个IPstring hashFunction(const string& ip, int bucketCount) {hash<string> hasher;return to_string(hasher(ip) % bucketCount);
}void splitFile(const string& filename, int bucketCount) {unordered_map<string, ofstream> files;ifstream inputFile(filename);string ip;while (getline(inputFile, ip)) {string hashValue = hashFunction(ip, bucketCount);if (files.find(hashValue) == files.end()) {files[hashValue].open("bucket_" + hashValue + ".txt");}files[hashValue] << ip << endl;}for (auto& pair : files) {pair.second.close();}
}vector<pair<string, int>> findTopKIPs(const string& filename) {splitFile(filename, HASH_BUCKETS);priority_queue<pair<int, string>, vector<pair<int, string>>, greater<pair<int, string>>> minHeap;for (int i = 0; i < HASH_BUCKETS; ++i) {string bucket = "bucket_" + to_string(i) + ".txt";ifstream bucketFile(bucket);unordered_map<string, int> ipCount;string ip;while (getline(bucketFile, ip)) {ipCount[ip]++;}for (const auto& pair : ipCount) {if (minHeap.size() < K) {minHeap.push({pair.second, pair.first});} else if (pair.second > minHeap.top().first) {minHeap.pop();minHeap.push({pair.second, pair.first});}}remove(bucket.c_str());}vector<pair<string, int>> result;while (!minHeap.empty()) {result.push_back({minHeap.top().second, minHeap.top().first});minHeap.pop();}reverse(result.begin(), result.end());return result;
}int main() {string filename = "log.txt"; // 假设日志文件名为log.txtvector<pair<string, int>> topKIPs = findTopKIPs(filename);cout << "Top " << K << " IPs and their counts: " << endl;for (const auto& pair : topKIPs) {cout << pair.first << ": " << pair.second << endl;}return 0;
}