DDR爱好者之家 Design By 杰米
本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
import simplejson import urllib import urllib2 import os, sys import logging try: import sqlite3 except ImportError: sys.stderr.write("ERROR: Unable to locate Python SQLite3 module. " \ "Please verify your installation. Exiting...\n") sys.exit(-1) MD5 = "5248f774d2ee0a10936d0b1dc89107f1" MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com APIKEY = "xxxxxxxxxxxxxxxxxx"用自己的 class VirusTotalDatabase: """ Database abstraction layer. """ def __init__(self, db_file): log = logging.getLogger("Database.Init") self.__dbfile = db_file self._conn = None self._cursor = None # Check if SQLite database already exists. If it doesn't exist I invoke # the generation procedure. if not os.path.exists(self.__dbfile): if self._generate(): print("Generated database \"%s\" which didn't" \ " exist before." % self.__dbfile) else: print("Unable to generate database") # Once the database is generated of it already has been, I can # initialize the connection. try: self._conn = sqlite3.connect(self.__dbfile) self._cursor = self._conn.cursor() except Exception, why: print("Unable to connect to database \"%s\": %s." % (self.__dbfile, why)) log.debug("Connected to SQLite database \"%s\"." % self.__dbfile) def _generate(self): """ Creates database structure in a SQLite file. """ if os.path.exists(self.__dbfile): return False db_dir = os.path.dirname(self.__dbfile) if not os.path.exists(db_dir): try: os.makedirs(db_dir) except (IOError, os.error), why: print("Something went wrong while creating database " \ "directory \"%s\": %s" % (db_dir, why)) return False conn = sqlite3.connect(self.__dbfile) cursor = conn.cursor() cursor.execute("CREATE TABLE virustotal (\n" \ " id INTEGER PRIMARY KEY,\n" \ " md5 TEXT NOT NULL,\n" \ " Kaspersky TEXT DEFAULT NULL,\n" \ " McAfee TEXT DEFAULT NULL,\n" \ " Symantec TEXT DEFAULT NULL,\n" \ " Norman TEXT DEFAULT NULL,\n" \ " Avast TEXT DEFAULT NULL,\n" \ " NOD32 TEXT DEFAULT NULL,\n" \ " BitDefender TEXT DEFAULT NULL,\n" \ " Microsoft TEXT DEFAULT NULL,\n" \ " Rising TEXT DEFAULT NULL,\n" \ " Panda TEXT DEFAULT NULL\n" \ ");") print "create db:%s sucess" % self.__dbfile return True def _get_task_dict(self, row): try: task = {} task["id"] = row[0] task["md5"] = row[1] task["Kaspersky"] = row[2] task["McAfee"] = row[3] task["Symantec"] = row[4] task["Norman"] = row[5] task["Avast"] = row[6] task["NOD32"] = row[7] task["BitDefender"] = row[8] task["Microsoft"] = row[9] task["Rising"] = row[10] task["Panda"] = row[11] return task except Exception, why: return None def add_sample(self, md5, virus_dict): """ """ task_id = None if not self._cursor: return None if not md5 or md5 == "": return None Kaspersky = virus_dict.get("Kaspersky", None) McAfee = virus_dict.get("McAfee", None) Symantec = virus_dict.get("Symantec", None) Norman = virus_dict.get("Norman", None) Avast = virus_dict.get("Avast", None) NOD32 = virus_dict.get("NOD32", None) BitDefender = virus_dict.get("BitDefender", None) Microsoft = virus_dict.get("Microsoft", None) Rising = virus_dict.get("Rising", None) Panda = virus_dict.get("Panda", None) self._conn.text_factory = str try: self._cursor.execute("SELECT id FROM virustotal WHERE md5 = ", (md5,)) sample_row = self._cursor.fetchone() except sqlite3.OperationalError, why: print "sqlite3 error:%s\n" % str(why) return False if sample_row: try: sample_row = sample_row[0] self._cursor.execute("UPDATE virustotal SET Kaspersky=", (Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft,\ Rising, Panda, sample_row)) self._conn.commit() task_id = sample_row except sqlite3.OperationalError, why: print("Unable to update database: %s." % why) return False else: #the sample not in the database try: self._cursor.execute("INSERT INTO virustotal " \ "(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\ Microsoft, Rising, Panda) " \ "VALUES (", (md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\ Microsoft, Rising, Panda)) self._conn.commit() task_id = self._cursor.lastrowid except sqlite3.OperationalError, why: print "why",str(why) return None print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id)) return task_id def get_sample(self): """ Gets a task from pending queue. """ log = logging.getLogger("Database.GetTask") if not self._cursor: log.error("Unable to acquire cursor.") return None # Select one item from the queue table with higher priority and older # addition date which has not already been processed. try: self._cursor.execute("SELECT * FROM virustotal " \ #"WHERE lock = 0 " \ #"AND status = 0 " \ "ORDER BY id, added_on LIMIT 1;") except sqlite3.OperationalError, why: log.error("Unable to query database: %s." % why) return None sample_row = self._cursor.fetchone() if sample_row: return self._get_task_dict(sample_row) else: return None def search_md5(self, md5): """ """ if not self._cursor: return None if not md5 or len(md5) != 32: return None try: self._cursor.execute("SELECT * FROM virustotal " \ "WHERE md5 = " \ #"AND status = 1 " \ "ORDER BY id DESC;", (md5,)) except sqlite3.OperationalError, why: return None task_dict = {} for row in self._cursor.fetchall(): task_dict = self._get_task_dict(row) #if task_dict: #tasks.append(task_dict) return task_dict class VirusTotal: """""" def __init__(self, md5): """Constructor""" self._virus_dict = {} self._md5 = md5 self._db_file = r"./db/virustotal.db" self.get_report_dict() def repr(self): return str(self._virus_dict) def submit_md5(self, file_path): import postfile #submit the file FILE_NAME = os.path.basename(file_path) host = "www.virustotal.com" selector = "https://www.virustotal.com/vtapi/v2/file/scan" fields = [("apikey", APIKEY)] file_to_send = open(file_path, "rb").read() files = [("file", FILE_NAME, file_to_send)] json = postfile.post_multipart(host, selector, fields, files) print json pass def get_report_dict(self): result_dict = {} url = "https://www.virustotal.com/vtapi/v2/file/report" parameters = {"resource": self._md5, "apikey": APIKEY} data = urllib.urlencode(parameters) req = urllib2.Request(url, data) response = urllib2.urlopen(req) json = response.read() response_dict = simplejson.loads(json) if response_dict["response_code"]: #has result scans_dict = response_dict.get("scans", {}) for anti_virus_comany, virus_name in scans_dict.iteritems(): if virus_name["detected"]: result_dict.setdefault(anti_virus_comany, virus_name["result"]) return result_dict def write_to_db(self): """""" db = VirusTotalDatabase(self._db_file) virus_dict = self.get_report_dict() db.add_sample(self._md5, virus_dict)
使用方法如下:
config = {'input':"inputMd5s"} fp = open(config['input'], "r") content = fp.readlines() MD5S = [] for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)): MD5S.append(md5) print "MD5S",MD5S fp.close() from getVirusTotalInfo import VirusTotal #得到扫描结果并写入数库 for md5 in MD5S: virus_total = VirusTotal(md5) virus_total.write_to_db()
希望本文所述对大家的Python程序设计有所帮助。
DDR爱好者之家 Design By 杰米
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
DDR爱好者之家 Design By 杰米
暂无评论...
P70系列延期,华为新旗舰将在下月发布
3月20日消息,近期博主@数码闲聊站 透露,原定三月份发布的华为新旗舰P70系列延期发布,预计4月份上市。
而博主@定焦数码 爆料,华为的P70系列在定位上已经超过了Mate60,成为了重要的旗舰系列之一。它肩负着重返影像领域顶尖的使命。那么这次P70会带来哪些令人惊艳的创新呢?
根据目前爆料的消息来看,华为P70系列将推出三个版本,其中P70和P70 Pro采用了三角形的摄像头模组设计,而P70 Art则采用了与上一代P60 Art相似的不规则形状设计。这样的外观是否好看见仁见智,但辨识度绝对拉满。
更新日志
2025年01月09日
2025年01月09日
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]