Loading... # 准备工作 1. python 3.7 + IDE 2. internet # 致谢 本项目借鉴于qqwry-py3,相关文件已附上地址: https://pypi.org/project/qqwry-py3/ # 吐槽 搞了一下午,总算出来了,qqwry-py3中解析那部分写的是非常棒,但是在更新文件这里,UA头可能是以前的,现在不能用了,替换一下就欧克了。 数据库这部分通过索引做的单文件,来回来去绕的我头发懵,dump都毫无思路,经过不懈的努力,终于搞出来了,主要对于这个数据库不是查询,而是dump,然后可以通过这些数据做自己想做的事情了,就可以脱离原来的那个软件了。 # 声明 摘自本人CSDN:https://blog.csdn.net/Karuseny/article/details/106816511?spm=1001.2014.3001.5501 # 直接上代码吧 ```sql # sql文件 CREATE TABLE `ipdb` ( `startip` varchar(15) DEFAULT NULL, `endip` varchar(15) DEFAULT NULL, `startip_iton` varchar(11) DEFAULT NULL, `endip_iton` varchar(11) DEFAULT NULL, `address` varchar(256) DEFAULT NULL, KEY `s` (`startip`), KEY `e` (`endip`), KEY `l` (`address`), KEY `si` (`startip_iton`), KEY `ei` (`endip_iton`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ``` ```python import requests import re import MySQLdb import conf # 这个是我自己做的包 import zlib import struct import socket import warnings import os database_info = conf.getdb() # 这里是获取mysql数据库的方法。 ver_url = 'http://update.cz88.net/ip/copywrite.rar' db_url = 'http://update.cz88.net/ip/qqwry.rar' db = MySQLdb.connect(database_info['ip'], database_info['user'], database_info['password'], database_info['db'], charset=database_info['charset']) # 连接到你的mysql数据库 headers={ 'User-agent':'Mozilla/3.0 (compatible; Indy Library)', 'Accept':'text/html, */*', 'Host': 'update.cz88.net' } # 抓包而来的请求头 def int3(data, offset): # 此代码来源于 qqwry-py3中 return data[offset] + (data[offset + 1] << 8) + \ (data[offset + 2] << 16) def int4(data, offset):# 此代码来源于 qqwry-py3中 return data[offset] + (data[offset + 1] << 8) + \ (data[offset + 2] << 16) + (data[offset + 3] << 24) class wrydb: # 此代码来源于 qqwry-py3中,但是我做了一小部分的修改 def __init__(self): self.clear() def clear(self): self.idx1 = None self.idx2 = None self.idxo = None self.data = None self.index_begin = -1 self.index_end = -1 self.index_count = -1 self.__fun = None def load_file(self, filename, loadindex=False): self.clear() if type(filename) == bytes: self.data = buffer = filename filename = 'memory data' elif type(filename) == str: # read file try: with open(filename, 'br') as f: self.data = buffer = f.read() self.data2 = open(filename, 'rb') except Exception as e: print('[!] Open or load failed:', e) self.clear() return False if self.data == None: print('[!] %s load failed' % filename) self.clear() return False else: self.clear() return False if len(buffer) < 8: print('[!] %s load failed, file only %d bytes' % (filename, len(buffer)) ) self.clear() return False index_begin = int4(buffer, 0) index_end = int4(buffer, 4) if index_begin > index_end or \ (index_end - index_begin) % 7 != 0 or \ index_end + 7 > len(buffer): print('[!] %s index error' % filename) self.clear() return False self.index_begin = index_begin self.index_end = index_end self.index_count = (index_end - index_begin) // 7 + 1 if not loadindex: print('[#] %s %s bytes, %d segments.' % (filename, format(len(buffer), ','), self.index_count) ) self.__fun = self.raw_search return True def __get_addr(self, offset): mode = self.data[offset] if mode == 1: offset = int3(self.data, offset + 1) mode = self.data[offset] if mode == 2: off1 = int3(self.data, offset + 1) c = self.data[off1:self.data.index(b'\x00', off1)] offset += 4 else: c = self.data[offset:self.data.index(b'\x00', offset)] offset += len(c) + 1 if self.data[offset] == 2: offset = int3(self.data, offset + 1) p = self.data[offset:self.data.index(b'\x00', offset)] return str(c.decode('gb18030', errors='replace')) + " " + str(p.decode('gb18030', errors='replace')) def raw_search(self, ip): l = 0 r = self.index_count while r - l > 1: m = (l + r) // 2 offset = self.index_begin + m * 7 new_ip = int4(self.data, offset) if ip < new_ip: r = m else: l = m offset = self.index_begin + 7 * l ip_begin = int4(self.data, offset) offset = int3(self.data, offset + 4) ip_end = int4(self.data, offset) if ip_begin <= ip <= ip_end: return self.__get_addr(offset + 4) else: return None def toip(self, hexip): return socket.inet_ntoa(struct.pack(">I", hexip)) def dump(self): # 就是这个地方,写了好久,放弃这段又拾起这段反反复复,最后还是选择这个了。 db = MySQLdb.connect(database_info['ip'], database_info['user'], database_info['password'], database_info['db'], charset=database_info['charset']) old = "" l = 0 r = self.index_count print("[!] Now will delete all record for ipdb table, please press [Y/y] to confirm, if you input [n/N] to exit") while True: cf = input("[@]y/n? : ") if cf == 'y' or cf == 'Y': break elif cf == 'n' or cf == 'N': return else: continue print("[#] progressing...") cursor = db.cursor() cursor.execute("delete from ipdb") db.commit() print("[#] Successfully deleted all records for ipdb table") for m in range(l, r): # 灵感来源于raw_search这个方法 offset = self.index_begin + m * 7 ip_begin = int4(self.data, offset) offset = int3(self.data, offset + 4) ip_end = int4(self.data, offset) address = self.__get_addr(offset + 4) startip = self.toip(ip_begin) endip = self.toip(ip_end) startip_iton = ip_begin endip_iton = ip_end cursor = db.cursor() sql = "insert into ipdb(startip, endip, startip_iton, endip_iton, address) values (%s,%s,%s,%s,%s)" cursor.execute(sql, (startip, endip, startip_iton, endip_iton, address)) if m % 1000 ==0 or m == r-1: db.commit() print("\rNow Progress: "+str(m)+", Total number: "+str(r) + " % : "+ str("%.3f" %(int(m)/int(r)*100)), end=" ") return old def get_lasetst_version(): # 正则用不太好,见笑了。 res = requests.get(ver_url, headers=headers, timeout=5, verify=False) res.encoding = "gbk" text = res.text year = re.findall(r'(?=20).+?(?=年)', text)[0] month = re.findall(r'(?=年).+.(?=月)', text)[0].split('年')[1] day = re.findall(r'(?=月).+.(?=日)', text)[0].split('月')[1] return (year+"-"+month+"-"+day) def get_lasetst_db(filename): def get_content(url): res = requests.get(url,headers=headers,timeout=60).content return res print("[#] Downloading ver file...") data = get_content(ver_url) print("[#] Checking ver file...") if not data: warnings.warn("[#] Download verinfo failure", RuntimeWarning) if len(data) <= 24 or data[:4] != b'CZIP': warnings.warn("[#] Decode verinfo failure", RuntimeWarning) version, unknown1, size, unknown2, key = \ struct.unpack_from('<IIIII', data, 4) if unknown1 != 1: warnings.warn("[#] Decode verinfo failure", RuntimeWarning) print("[#] Downloading db file...") data = get_content(db_url) print("[#] Checking db file...") if not data: warnings.warn("[#] Download dbinfo failure", RuntimeWarning) if size != len(data): warnings.warn("[#] File check failure", RuntimeWarning) head = bytearray(0x200) for i in range(0x200): key = (key * 0x805 + 1) & 0xff head[i] = data[i] ^ key data = head + data[0x200:] print("[#] Unpacking file...") try: data = zlib.decompress(data) except: warnings.warn("[#] Unpack failure", RuntimeWarning) if filename == None: return data elif type(filename) == str: print("[#] Saving file...") try: with open(filename, 'wb') as f: f.write(data) return len(data) except: warnings.warn("[#] File save failure", RuntimeWarning) else: warnings.warn("[#] File save failure", RuntimeWarning) return True def update_mydb(): fn = 'lastest.dat' q = wrydb() q.load_file(fn) q.dump() q.clear() def autorun(): if get_lasetst_db('lastest.dat'): update_mydb() else: print("[#] download failed") autorun() ``` © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 如果觉得我的文章对你有用,请随意赞赏