|
| 1 | +# -*- coding: utf_8 -*- |
| 2 | +# Module for Malware Analysis |
| 3 | +import io |
| 4 | +import logging |
| 5 | +import re |
| 6 | +from pathlib import Path |
| 7 | +from socket import ( |
| 8 | + gaierror, |
| 9 | + gethostbyname, |
| 10 | +) |
| 11 | +from urllib.parse import urlparse |
| 12 | + |
| 13 | +from django.conf import settings |
| 14 | + |
| 15 | +import IP2Location |
| 16 | + |
| 17 | +from MobSF.utils import ( |
| 18 | + is_internet_available, |
| 19 | + update_local_db, |
| 20 | +) |
| 21 | + |
| 22 | +logger = logging.getLogger(__name__) |
| 23 | +IP2Loc = IP2Location.IP2Location() |
| 24 | + |
| 25 | + |
| 26 | +class MalwareDomainCheck: |
| 27 | + |
| 28 | + def __init__(self): |
| 29 | + self.sig_dir = Path(settings.SIGNATURE_DIR) |
| 30 | + self.malwaredomainlist = self.sig_dir / 'malwaredomainlist' |
| 31 | + self.maltrail = self.sig_dir / 'maltrail-malware-domains.txt' |
| 32 | + self.iplocbin = self.sig_dir / 'IP2LOCATION-LITE-DB5.IPV6.BIN' |
| 33 | + self.result = {} |
| 34 | + self.domainlist = None |
| 35 | + |
| 36 | + def update_malware_db(self): |
| 37 | + """Check for update in malware DB.""" |
| 38 | + try: |
| 39 | + mal_db = self.malwaredomainlist |
| 40 | + resp = update_local_db('Malware', settings.MALWARE_DB_URL, mal_db) |
| 41 | + if not resp: |
| 42 | + return |
| 43 | + # DB needs update |
| 44 | + # Check2: DB Syntax Changed |
| 45 | + line = resp.decode('utf-8', 'ignore').split('\n')[0] |
| 46 | + lst = line.split('",') |
| 47 | + if len(lst) == 10: |
| 48 | + # DB Format is not changed. Let's update DB |
| 49 | + logger.info('Updating Malware Database') |
| 50 | + with open(mal_db, 'wb') as wfp: |
| 51 | + wfp.write(resp) |
| 52 | + else: |
| 53 | + logger.warning('Unable to Update Malware DB') |
| 54 | + except Exception: |
| 55 | + logger.exception('[ERROR] Malware DB Update') |
| 56 | + |
| 57 | + def update_maltrail_db(self): |
| 58 | + """Check for update in maltrail DB.""" |
| 59 | + try: |
| 60 | + mal_db = self.maltrail |
| 61 | + resp = update_local_db( |
| 62 | + 'Maltrail', |
| 63 | + settings.MALTRAIL_DB_URL, mal_db) |
| 64 | + if not resp: |
| 65 | + return |
| 66 | + # DB needs update |
| 67 | + # Check2: DB Syntax Changed |
| 68 | + lines = resp.decode('utf-8', 'ignore').splitlines() |
| 69 | + if len(lines) > 100: |
| 70 | + logger.info('Updating Maltrail Database') |
| 71 | + with open(mal_db, 'wb') as wfp: |
| 72 | + wfp.write(resp) |
| 73 | + else: |
| 74 | + logger.warning('Unable to Update Maltrail DB') |
| 75 | + except Exception: |
| 76 | + logger.exception('[ERROR] Maltrail DB Update') |
| 77 | + |
| 78 | + def gelocation(self): |
| 79 | + """Perform Geolocation.""" |
| 80 | + try: |
| 81 | + IP2Loc.open(self.iplocbin) |
| 82 | + for domain in self.domainlist: |
| 83 | + # Tag Good Domains |
| 84 | + if domain not in self.result: |
| 85 | + tmp_d = {} |
| 86 | + tmp_d['bad'] = 'no' |
| 87 | + self.result[domain] = tmp_d |
| 88 | + # GeoIP |
| 89 | + ip = None |
| 90 | + try: |
| 91 | + ip = gethostbyname(domain) |
| 92 | + except (gaierror, UnicodeError): |
| 93 | + pass |
| 94 | + if ip: |
| 95 | + rec = IP2Loc.get_all(ip) |
| 96 | + self.result[domain]['geolocation'] = rec.__dict__ |
| 97 | + else: |
| 98 | + self.result[domain]['geolocation'] = None |
| 99 | + except Exception: |
| 100 | + logger.exception('Failed to Perform Geolocation') |
| 101 | + finally: |
| 102 | + if IP2Loc: |
| 103 | + IP2Loc.close() |
| 104 | + |
| 105 | + def malware_check(self): |
| 106 | + try: |
| 107 | + mal_db = self.malwaredomainlist |
| 108 | + with io.open(mal_db, |
| 109 | + mode='r', |
| 110 | + encoding='utf8', |
| 111 | + errors='ignore') as flip: |
| 112 | + entry_list = flip.readlines() |
| 113 | + for entry in entry_list: |
| 114 | + enlist = entry.split('","') |
| 115 | + if len(enlist) > 5: |
| 116 | + details_dict = {} |
| 117 | + details_dict['domain_or_url'] = enlist[1] |
| 118 | + details_dict['ip'] = enlist[2] |
| 119 | + details_dict['desc'] = enlist[4] |
| 120 | + details_dict['bad'] = 'yes' |
| 121 | + dmn_url = details_dict['domain_or_url'] |
| 122 | + for domain in self.domainlist: |
| 123 | + dmn_neturl = get_netloc(dmn_url) |
| 124 | + if (((dmn_neturl == domain or dmn_neturl == domain[4:]) |
| 125 | + and (len(dmn_url) > 1)) |
| 126 | + or details_dict['ip'].startswith(domain)): |
| 127 | + self.result[domain] = details_dict |
| 128 | + except Exception: |
| 129 | + logger.exception('[ERROR] Performing Malware Check') |
| 130 | + |
| 131 | + def maltrail_check(self): |
| 132 | + try: |
| 133 | + mal_db = self.maltrail |
| 134 | + with io.open(mal_db, |
| 135 | + mode='r', |
| 136 | + encoding='utf8', |
| 137 | + errors='ignore') as flip: |
| 138 | + entry_list = flip.read().splitlines() |
| 139 | + for domain in self.domainlist: |
| 140 | + if domain in entry_list: |
| 141 | + self.result[domain] = { |
| 142 | + 'domain_or_url': domain, |
| 143 | + 'ip': 'N/A', |
| 144 | + 'desc': 'Malicious Domain tagged by Maltrail', |
| 145 | + 'bad': 'yes', |
| 146 | + } |
| 147 | + except Exception: |
| 148 | + logger.exception('[ERROR] Performing Maltrail Check') |
| 149 | + |
| 150 | + def update(self): |
| 151 | + if is_internet_available(): |
| 152 | + self.update_malware_db() |
| 153 | + self.update_maltrail_db() |
| 154 | + else: |
| 155 | + logger.warning('Internet not available. ' |
| 156 | + 'Skipping Malware Database Update.') |
| 157 | + |
| 158 | + def scan(self, urls): |
| 159 | + if not settings.DOMAIN_MALWARE_SCAN: |
| 160 | + logger.info('Domain Malware Check disabled in settings') |
| 161 | + return self.result |
| 162 | + self.domainlist = get_domains(urls) |
| 163 | + if self.domainlist: |
| 164 | + self.update() |
| 165 | + self.malware_check() |
| 166 | + self.maltrail_check() |
| 167 | + self.gelocation() |
| 168 | + return self.result |
| 169 | + |
| 170 | + |
| 171 | +# Helper Functions |
| 172 | + |
| 173 | +def verify_domain(checkeddom): |
| 174 | + try: |
| 175 | + if (len(checkeddom) > 2 |
| 176 | + and '.' in checkeddom |
| 177 | + and (checkeddom.endswith('.') is False |
| 178 | + and re.search('[a-zA-Z0-9]', checkeddom))): |
| 179 | + return True |
| 180 | + else: |
| 181 | + return False |
| 182 | + except Exception: |
| 183 | + logger.exception('[ERROR] Verifying Domain') |
| 184 | + |
| 185 | + |
| 186 | +def get_netloc(url): |
| 187 | + """Get Domain.""" |
| 188 | + try: |
| 189 | + domain = '' |
| 190 | + parse_uri = urlparse(url) |
| 191 | + if not parse_uri.scheme: |
| 192 | + url = '//' + url |
| 193 | + parse_uri = urlparse(url) |
| 194 | + domain = '{uri.netloc}'.format(uri=parse_uri) |
| 195 | + if verify_domain(domain): |
| 196 | + return domain |
| 197 | + except Exception: |
| 198 | + logger.exception('[ERROR] Extracting Domain form URL') |
| 199 | + |
| 200 | + |
| 201 | +def sanitize_domain(domain): |
| 202 | + """Sanitize domain to be RFC1034 compliant.""" |
| 203 | + domain = domain.split('_')[0] |
| 204 | + domain = re.sub(r'[^\w^\.^\-]', '', domain) |
| 205 | + if domain.startswith('-'): |
| 206 | + domain = sanitize_domain(domain[1:]) |
| 207 | + elif domain.endswith('-'): |
| 208 | + domain = sanitize_domain(domain[:-1]) |
| 209 | + return domain |
| 210 | + |
| 211 | + |
| 212 | +def get_domains(urls): |
| 213 | + """Get Domains.""" |
| 214 | + try: |
| 215 | + domains = set() |
| 216 | + for url in urls: |
| 217 | + parse_uri = urlparse(url) |
| 218 | + if not parse_uri.scheme: |
| 219 | + url = '//' + url |
| 220 | + parse_uri = urlparse(url) |
| 221 | + domain = sanitize_domain( |
| 222 | + '{uri.hostname}'.format(uri=parse_uri)) |
| 223 | + if verify_domain(domain): |
| 224 | + domains.add(domain) |
| 225 | + return domains |
| 226 | + except Exception: |
| 227 | + logger.exception('[ERROR] Extracting Domain form URL') |
0 commit comments