0e7219b191
This fix addresses a critical security vulnerability where HTTP requests could hang indefinitely, potentially causing denial of service. Changes: - Added 10-second timeout to version check API call - Added 10-second timeout to GitHub pull request API call - Added 30-second timeout to data file downloads (larger timeout for data) - Added 10-second timeout to exclusions list download Impact: - Prevents infinite hangs that could freeze the application - Improves user experience with predictable response times - Fixes security issue flagged by Bandit static analysis (B113) - Makes the application more robust in poor network conditions The timeouts are conservative enough to work with slow connections while preventing indefinite blocking that could be exploited.
991 lines
36 KiB
Python
991 lines
36 KiB
Python
#! /usr/bin/env python3
|
|
|
|
"""
|
|
Sherlock: Find Usernames Across Social Networks Module
|
|
|
|
This module contains the main logic to search for usernames at social
|
|
networks.
|
|
"""
|
|
|
|
import sys
|
|
|
|
try:
|
|
from sherlock_project.__init__ import import_error_test_var # noqa: F401
|
|
except ImportError:
|
|
print("Did you run Sherlock with `python3 sherlock/sherlock.py ...`?")
|
|
print("This is an outdated method. Please see https://sherlockproject.xyz/installation for up to date instructions.")
|
|
sys.exit(1)
|
|
|
|
import csv
|
|
import signal
|
|
import pandas as pd
|
|
import os
|
|
import re
|
|
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
|
from json import loads as json_loads
|
|
from time import monotonic
|
|
from typing import Optional
|
|
|
|
import requests
|
|
from requests_futures.sessions import FuturesSession
|
|
|
|
from sherlock_project.__init__ import (
|
|
__longname__,
|
|
__shortname__,
|
|
__version__,
|
|
forge_api_latest_release,
|
|
)
|
|
|
|
from sherlock_project.result import QueryStatus
|
|
from sherlock_project.result import QueryResult
|
|
from sherlock_project.notify import QueryNotify
|
|
from sherlock_project.notify import QueryNotifyPrint
|
|
from sherlock_project.sites import SitesInformation
|
|
from colorama import init
|
|
from argparse import ArgumentTypeError
|
|
|
|
|
|
class SherlockFuturesSession(FuturesSession):
|
|
def request(self, method, url, hooks=None, *args, **kwargs):
|
|
"""Request URL.
|
|
|
|
This extends the FuturesSession request method to calculate a response
|
|
time metric to each request.
|
|
|
|
It is taken (almost) directly from the following Stack Overflow answer:
|
|
https://github.com/ross/requests-futures#working-in-the-background
|
|
|
|
Keyword Arguments:
|
|
self -- This object.
|
|
method -- String containing method desired for request.
|
|
url -- String containing URL for request.
|
|
hooks -- Dictionary containing hooks to execute after
|
|
request finishes.
|
|
args -- Arguments.
|
|
kwargs -- Keyword arguments.
|
|
|
|
Return Value:
|
|
Request object.
|
|
"""
|
|
# Record the start time for the request.
|
|
if hooks is None:
|
|
hooks = {}
|
|
start = monotonic()
|
|
|
|
def response_time(resp, *args, **kwargs):
|
|
"""Response Time Hook.
|
|
|
|
Keyword Arguments:
|
|
resp -- Response object.
|
|
args -- Arguments.
|
|
kwargs -- Keyword arguments.
|
|
|
|
Return Value:
|
|
Nothing.
|
|
"""
|
|
resp.elapsed = monotonic() - start
|
|
|
|
return
|
|
|
|
# Install hook to execute when response completes.
|
|
# Make sure that the time measurement hook is first, so we will not
|
|
# track any later hook's execution time.
|
|
try:
|
|
if isinstance(hooks["response"], list):
|
|
hooks["response"].insert(0, response_time)
|
|
elif isinstance(hooks["response"], tuple):
|
|
# Convert tuple to list and insert time measurement hook first.
|
|
hooks["response"] = list(hooks["response"])
|
|
hooks["response"].insert(0, response_time)
|
|
else:
|
|
# Must have previously contained a single hook function,
|
|
# so convert to list.
|
|
hooks["response"] = [response_time, hooks["response"]]
|
|
except KeyError:
|
|
# No response hook was already defined, so install it ourselves.
|
|
hooks["response"] = [response_time]
|
|
|
|
return super(SherlockFuturesSession, self).request(
|
|
method, url, hooks=hooks, *args, **kwargs
|
|
)
|
|
|
|
|
|
def get_response(request_future, error_type, social_network):
|
|
# Default for Response object if some failure occurs.
|
|
response = None
|
|
|
|
error_context = "General Unknown Error"
|
|
exception_text = None
|
|
try:
|
|
response = request_future.result()
|
|
if response.status_code:
|
|
# Status code exists in response object
|
|
error_context = None
|
|
except requests.exceptions.HTTPError as errh:
|
|
error_context = "HTTP Error"
|
|
exception_text = str(errh)
|
|
except requests.exceptions.ProxyError as errp:
|
|
error_context = "Proxy Error"
|
|
exception_text = str(errp)
|
|
except requests.exceptions.ConnectionError as errc:
|
|
error_context = "Error Connecting"
|
|
exception_text = str(errc)
|
|
except requests.exceptions.Timeout as errt:
|
|
error_context = "Timeout Error"
|
|
exception_text = str(errt)
|
|
except requests.exceptions.RequestException as err:
|
|
error_context = "Unknown Error"
|
|
exception_text = str(err)
|
|
|
|
return response, error_context, exception_text
|
|
|
|
|
|
def interpolate_string(input_object, username):
|
|
if isinstance(input_object, str):
|
|
return input_object.replace("{}", username)
|
|
elif isinstance(input_object, dict):
|
|
return {k: interpolate_string(v, username) for k, v in input_object.items()}
|
|
elif isinstance(input_object, list):
|
|
return [interpolate_string(i, username) for i in input_object]
|
|
return input_object
|
|
|
|
|
|
def check_for_parameter(username):
|
|
"""checks if {?} exists in the username
|
|
if exist it means that sherlock is looking for more multiple username"""
|
|
return "{?}" in username
|
|
|
|
|
|
checksymbols = ["_", "-", "."]
|
|
|
|
|
|
def multiple_usernames(username):
|
|
"""replace the parameter with with symbols and return a list of usernames"""
|
|
allUsernames = []
|
|
for i in checksymbols:
|
|
allUsernames.append(username.replace("{?}", i))
|
|
return allUsernames
|
|
|
|
|
|
def sherlock(
|
|
username: str,
|
|
site_data: dict[str, dict[str, str]],
|
|
query_notify: QueryNotify,
|
|
tor: bool = False,
|
|
unique_tor: bool = False,
|
|
dump_response: bool = False,
|
|
proxy: Optional[str] = None,
|
|
timeout: int = 60,
|
|
) -> dict[str, dict[str, str | QueryResult]]:
|
|
"""Run Sherlock Analysis.
|
|
|
|
Checks for existence of username on various social media sites.
|
|
|
|
Keyword Arguments:
|
|
username -- String indicating username that report
|
|
should be created against.
|
|
site_data -- Dictionary containing all of the site data.
|
|
query_notify -- Object with base type of QueryNotify().
|
|
This will be used to notify the caller about
|
|
query results.
|
|
tor -- Boolean indicating whether to use a tor circuit for the requests.
|
|
unique_tor -- Boolean indicating whether to use a new tor circuit for each request.
|
|
proxy -- String indicating the proxy URL
|
|
timeout -- Time in seconds to wait before timing out request.
|
|
Default is 60 seconds.
|
|
|
|
Return Value:
|
|
Dictionary containing results from report. Key of dictionary is the name
|
|
of the social network site, and the value is another dictionary with
|
|
the following keys:
|
|
url_main: URL of main site.
|
|
url_user: URL of user on site (if account exists).
|
|
status: QueryResult() object indicating results of test for
|
|
account existence.
|
|
http_status: HTTP status code of query which checked for existence on
|
|
site.
|
|
response_text: Text that came back from request. May be None if
|
|
there was an HTTP error when checking for existence.
|
|
"""
|
|
|
|
# Notify caller that we are starting the query.
|
|
query_notify.start(username)
|
|
# Create session based on request methodology
|
|
if tor or unique_tor:
|
|
try:
|
|
from torrequest import TorRequest # noqa: E402
|
|
except ImportError:
|
|
print("Important!")
|
|
print("> --tor and --unique-tor are now DEPRECATED, and may be removed in a future release of Sherlock.")
|
|
print("> If you've installed Sherlock via pip, you can include the optional dependency via `pip install 'sherlock-project[tor]'`.")
|
|
print("> Other packages should refer to their documentation, or install it separately with `pip install torrequest`.\n")
|
|
sys.exit(query_notify.finish())
|
|
|
|
print("Important!")
|
|
print("> --tor and --unique-tor are now DEPRECATED, and may be removed in a future release of Sherlock.")
|
|
|
|
# Requests using Tor obfuscation
|
|
try:
|
|
underlying_request = TorRequest()
|
|
except OSError:
|
|
print("Tor not found in system path. Unable to continue.\n")
|
|
sys.exit(query_notify.finish())
|
|
|
|
underlying_session = underlying_request.session
|
|
else:
|
|
# Normal requests
|
|
underlying_session = requests.session()
|
|
underlying_request = requests.Request()
|
|
|
|
# Limit number of workers to 20.
|
|
# This is probably vastly overkill.
|
|
if len(site_data) >= 20:
|
|
max_workers = 20
|
|
else:
|
|
max_workers = len(site_data)
|
|
|
|
# Create multi-threaded session for all requests.
|
|
session = SherlockFuturesSession(
|
|
max_workers=max_workers, session=underlying_session
|
|
)
|
|
|
|
# Results from analysis of all sites
|
|
results_total = {}
|
|
|
|
# First create futures for all requests. This allows for the requests to run in parallel
|
|
for social_network, net_info in site_data.items():
|
|
# Results from analysis of this specific site
|
|
results_site = {"url_main": net_info.get("urlMain")}
|
|
|
|
# Record URL of main site
|
|
|
|
# A user agent is needed because some sites don't return the correct
|
|
# information since they think that we are bots (Which we actually are...)
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:129.0) Gecko/20100101 Firefox/129.0",
|
|
}
|
|
|
|
if "headers" in net_info:
|
|
# Override/append any extra headers required by a given site.
|
|
headers.update(net_info["headers"])
|
|
|
|
# URL of user on site (if it exists)
|
|
url = interpolate_string(net_info["url"], username.replace(' ', '%20'))
|
|
|
|
# Don't make request if username is invalid for the site
|
|
regex_check = net_info.get("regexCheck")
|
|
if regex_check and re.search(regex_check, username) is None:
|
|
# No need to do the check at the site: this username is not allowed.
|
|
results_site["status"] = QueryResult(
|
|
username, social_network, url, QueryStatus.ILLEGAL
|
|
)
|
|
results_site["url_user"] = ""
|
|
results_site["http_status"] = ""
|
|
results_site["response_text"] = ""
|
|
query_notify.update(results_site["status"])
|
|
else:
|
|
# URL of user on site (if it exists)
|
|
results_site["url_user"] = url
|
|
url_probe = net_info.get("urlProbe")
|
|
request_method = net_info.get("request_method")
|
|
request_payload = net_info.get("request_payload")
|
|
request = None
|
|
|
|
if request_method is not None:
|
|
if request_method == "GET":
|
|
request = session.get
|
|
elif request_method == "HEAD":
|
|
request = session.head
|
|
elif request_method == "POST":
|
|
request = session.post
|
|
elif request_method == "PUT":
|
|
request = session.put
|
|
else:
|
|
raise RuntimeError(f"Unsupported request_method for {url}")
|
|
|
|
if request_payload is not None:
|
|
request_payload = interpolate_string(request_payload, username)
|
|
|
|
if url_probe is None:
|
|
# Probe URL is normal one seen by people out on the web.
|
|
url_probe = url
|
|
else:
|
|
# There is a special URL for probing existence separate
|
|
# from where the user profile normally can be found.
|
|
url_probe = interpolate_string(url_probe, username)
|
|
|
|
if request is None:
|
|
if net_info["errorType"] == "status_code":
|
|
# In most cases when we are detecting by status code,
|
|
# it is not necessary to get the entire body: we can
|
|
# detect fine with just the HEAD response.
|
|
request = session.head
|
|
else:
|
|
# Either this detect method needs the content associated
|
|
# with the GET response, or this specific website will
|
|
# not respond properly unless we request the whole page.
|
|
request = session.get
|
|
|
|
if net_info["errorType"] == "response_url":
|
|
# Site forwards request to a different URL if username not
|
|
# found. Disallow the redirect so we can capture the
|
|
# http status from the original URL request.
|
|
allow_redirects = False
|
|
else:
|
|
# Allow whatever redirect that the site wants to do.
|
|
# The final result of the request will be what is available.
|
|
allow_redirects = True
|
|
|
|
# This future starts running the request in a new thread, doesn't block the main thread
|
|
if proxy is not None:
|
|
proxies = {"http": proxy, "https": proxy}
|
|
future = request(
|
|
url=url_probe,
|
|
headers=headers,
|
|
proxies=proxies,
|
|
allow_redirects=allow_redirects,
|
|
timeout=timeout,
|
|
json=request_payload,
|
|
)
|
|
else:
|
|
future = request(
|
|
url=url_probe,
|
|
headers=headers,
|
|
allow_redirects=allow_redirects,
|
|
timeout=timeout,
|
|
json=request_payload,
|
|
)
|
|
|
|
# Store future in data for access later
|
|
net_info["request_future"] = future
|
|
|
|
# Reset identify for tor (if needed)
|
|
if unique_tor:
|
|
underlying_request.reset_identity()
|
|
|
|
# Add this site's results into final dictionary with all the other results.
|
|
results_total[social_network] = results_site
|
|
|
|
# Open the file containing account links
|
|
# Core logic: If tor requests, make them here. If multi-threaded requests, wait for responses
|
|
for social_network, net_info in site_data.items():
|
|
# Retrieve results again
|
|
results_site = results_total.get(social_network)
|
|
|
|
# Retrieve other site information again
|
|
url = results_site.get("url_user")
|
|
status = results_site.get("status")
|
|
if status is not None:
|
|
# We have already determined the user doesn't exist here
|
|
continue
|
|
|
|
# Get the expected error type
|
|
error_type = net_info["errorType"]
|
|
|
|
# Retrieve future and ensure it has finished
|
|
future = net_info["request_future"]
|
|
r, error_text, exception_text = get_response(
|
|
request_future=future, error_type=error_type, social_network=social_network
|
|
)
|
|
|
|
# Get response time for response of our request.
|
|
try:
|
|
response_time = r.elapsed
|
|
except AttributeError:
|
|
response_time = None
|
|
|
|
# Attempt to get request information
|
|
try:
|
|
http_status = r.status_code
|
|
except Exception:
|
|
http_status = "?"
|
|
try:
|
|
response_text = r.text.encode(r.encoding or "UTF-8")
|
|
except Exception:
|
|
response_text = ""
|
|
|
|
query_status = QueryStatus.UNKNOWN
|
|
error_context = None
|
|
|
|
# As WAFs advance and evolve, they will occasionally block Sherlock and
|
|
# lead to false positives and negatives. Fingerprints should be added
|
|
# here to filter results that fail to bypass WAFs. Fingerprints should
|
|
# be highly targetted. Comment at the end of each fingerprint to
|
|
# indicate target and date fingerprinted.
|
|
WAFHitMsgs = [
|
|
r'.loading-spinner{visibility:hidden}body.no-js .challenge-running{display:none}body.dark{background-color:#222;color:#d9d9d9}body.dark a{color:#fff}body.dark a:hover{color:#ee730a;text-decoration:underline}body.dark .lds-ring div{border-color:#999 transparent transparent}body.dark .font-red{color:#b20f03}body.dark', # 2024-05-13 Cloudflare
|
|
r'<span id="challenge-error-text">', # 2024-11-11 Cloudflare error page
|
|
r'AwsWafIntegration.forceRefreshToken', # 2024-11-11 Cloudfront (AWS)
|
|
r'{return l.onPageView}}),Object.defineProperty(r,"perimeterxIdentifiers",{enumerable:' # 2024-04-09 PerimeterX / Human Security
|
|
]
|
|
|
|
if error_text is not None:
|
|
error_context = error_text
|
|
|
|
elif any(hitMsg in r.text for hitMsg in WAFHitMsgs):
|
|
query_status = QueryStatus.WAF
|
|
|
|
elif error_type == "message":
|
|
# error_flag True denotes no error found in the HTML
|
|
# error_flag False denotes error found in the HTML
|
|
error_flag = True
|
|
errors = net_info.get("errorMsg")
|
|
# errors will hold the error message
|
|
# it can be string or list
|
|
# by isinstance method we can detect that
|
|
# and handle the case for strings as normal procedure
|
|
# and if its list we can iterate the errors
|
|
if isinstance(errors, str):
|
|
# Checks if the error message is in the HTML
|
|
# if error is present we will set flag to False
|
|
if errors in r.text:
|
|
error_flag = False
|
|
else:
|
|
# If it's list, it will iterate all the error message
|
|
for error in errors:
|
|
if error in r.text:
|
|
error_flag = False
|
|
break
|
|
if error_flag:
|
|
query_status = QueryStatus.CLAIMED
|
|
else:
|
|
query_status = QueryStatus.AVAILABLE
|
|
elif error_type == "status_code":
|
|
error_codes = net_info.get("errorCode")
|
|
query_status = QueryStatus.CLAIMED
|
|
|
|
# Type consistency, allowing for both singlets and lists in manifest
|
|
if isinstance(error_codes, int):
|
|
error_codes = [error_codes]
|
|
|
|
if error_codes is not None and r.status_code in error_codes:
|
|
query_status = QueryStatus.AVAILABLE
|
|
elif r.status_code >= 300 or r.status_code < 200:
|
|
query_status = QueryStatus.AVAILABLE
|
|
elif error_type == "response_url":
|
|
# For this detection method, we have turned off the redirect.
|
|
# So, there is no need to check the response URL: it will always
|
|
# match the request. Instead, we will ensure that the response
|
|
# code indicates that the request was successful (i.e. no 404, or
|
|
# forward to some odd redirect).
|
|
if 200 <= r.status_code < 300:
|
|
query_status = QueryStatus.CLAIMED
|
|
else:
|
|
query_status = QueryStatus.AVAILABLE
|
|
else:
|
|
# It should be impossible to ever get here...
|
|
raise ValueError(
|
|
f"Unknown Error Type '{error_type}' for " f"site '{social_network}'"
|
|
)
|
|
|
|
if dump_response:
|
|
print("+++++++++++++++++++++")
|
|
print(f"TARGET NAME : {social_network}")
|
|
print(f"USERNAME : {username}")
|
|
print(f"TARGET URL : {url}")
|
|
print(f"TEST METHOD : {error_type}")
|
|
try:
|
|
print(f"STATUS CODES : {net_info['errorCode']}")
|
|
except KeyError:
|
|
pass
|
|
print("Results...")
|
|
try:
|
|
print(f"RESPONSE CODE : {r.status_code}")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print(f"ERROR TEXT : {net_info['errorMsg']}")
|
|
except KeyError:
|
|
pass
|
|
print(">>>>> BEGIN RESPONSE TEXT")
|
|
try:
|
|
print(r.text)
|
|
except Exception:
|
|
pass
|
|
print("<<<<< END RESPONSE TEXT")
|
|
print("VERDICT : " + str(query_status))
|
|
print("+++++++++++++++++++++")
|
|
|
|
# Notify caller about results of query.
|
|
result: QueryResult = QueryResult(
|
|
username=username,
|
|
site_name=social_network,
|
|
site_url_user=url,
|
|
status=query_status,
|
|
query_time=response_time,
|
|
context=error_context,
|
|
)
|
|
query_notify.update(result)
|
|
|
|
# Save status of request
|
|
results_site["status"] = result
|
|
|
|
# Save results from request
|
|
results_site["http_status"] = http_status
|
|
results_site["response_text"] = response_text
|
|
|
|
# Add this site's results into final dictionary with all of the other results.
|
|
results_total[social_network] = results_site
|
|
|
|
return results_total
|
|
|
|
|
|
def timeout_check(value):
|
|
"""Check Timeout Argument.
|
|
|
|
Checks timeout for validity.
|
|
|
|
Keyword Arguments:
|
|
value -- Time in seconds to wait before timing out request.
|
|
|
|
Return Value:
|
|
Floating point number representing the time (in seconds) that should be
|
|
used for the timeout.
|
|
|
|
NOTE: Will raise an exception if the timeout in invalid.
|
|
"""
|
|
|
|
float_value = float(value)
|
|
|
|
if float_value <= 0:
|
|
raise ArgumentTypeError(
|
|
f"Invalid timeout value: {value}. Timeout must be a positive number."
|
|
)
|
|
|
|
return float_value
|
|
|
|
|
|
def handler(signal_received, frame):
|
|
"""Exit gracefully without throwing errors
|
|
|
|
Source: https://www.devdungeon.com/content/python-catch-sigint-ctrl-c
|
|
"""
|
|
sys.exit(0)
|
|
|
|
|
|
def main():
|
|
parser = ArgumentParser(
|
|
formatter_class=RawDescriptionHelpFormatter,
|
|
description=f"{__longname__} (Version {__version__})",
|
|
)
|
|
parser.add_argument(
|
|
"--version",
|
|
action="version",
|
|
version=f"{__shortname__} v{__version__}",
|
|
help="Display version information and dependencies.",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
"-v",
|
|
"-d",
|
|
"--debug",
|
|
action="store_true",
|
|
dest="verbose",
|
|
default=False,
|
|
help="Display extra debugging information and metrics.",
|
|
)
|
|
parser.add_argument(
|
|
"--folderoutput",
|
|
"-fo",
|
|
dest="folderoutput",
|
|
help="If using multiple usernames, the output of the results will be saved to this folder.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
"-o",
|
|
dest="output",
|
|
help="If using single username, the output of the result will be saved to this file.",
|
|
)
|
|
parser.add_argument(
|
|
"--tor",
|
|
"-t",
|
|
action="store_true",
|
|
dest="tor",
|
|
default=False,
|
|
help="Make requests over Tor; increases runtime; requires Tor to be installed and in system path.",
|
|
)
|
|
parser.add_argument(
|
|
"--unique-tor",
|
|
"-u",
|
|
action="store_true",
|
|
dest="unique_tor",
|
|
default=False,
|
|
help="Make requests over Tor with new Tor circuit after each request; increases runtime; requires Tor to be installed and in system path.",
|
|
)
|
|
parser.add_argument(
|
|
"--csv",
|
|
action="store_true",
|
|
dest="csv",
|
|
default=False,
|
|
help="Create Comma-Separated Values (CSV) File.",
|
|
)
|
|
parser.add_argument(
|
|
"--xlsx",
|
|
action="store_true",
|
|
dest="xlsx",
|
|
default=False,
|
|
help="Create the standard file for the modern Microsoft Excel spreadsheet (xlsx).",
|
|
)
|
|
parser.add_argument(
|
|
"--site",
|
|
action="append",
|
|
metavar="SITE_NAME",
|
|
dest="site_list",
|
|
default=[],
|
|
help="Limit analysis to just the listed sites. Add multiple options to specify more than one site.",
|
|
)
|
|
parser.add_argument(
|
|
"--proxy",
|
|
"-p",
|
|
metavar="PROXY_URL",
|
|
action="store",
|
|
dest="proxy",
|
|
default=None,
|
|
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
|
|
)
|
|
parser.add_argument(
|
|
"--dump-response",
|
|
action="store_true",
|
|
dest="dump_response",
|
|
default=False,
|
|
help="Dump the HTTP response to stdout for targeted debugging.",
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
"-j",
|
|
metavar="JSON_FILE",
|
|
dest="json_file",
|
|
default=None,
|
|
help="Load data from a JSON file or an online, valid, JSON file. Upstream PR numbers also accepted.",
|
|
)
|
|
parser.add_argument(
|
|
"--timeout",
|
|
action="store",
|
|
metavar="TIMEOUT",
|
|
dest="timeout",
|
|
type=timeout_check,
|
|
default=60,
|
|
help="Time (in seconds) to wait for response to requests (Default: 60)",
|
|
)
|
|
parser.add_argument(
|
|
"--print-all",
|
|
action="store_true",
|
|
dest="print_all",
|
|
default=False,
|
|
help="Output sites where the username was not found.",
|
|
)
|
|
parser.add_argument(
|
|
"--print-found",
|
|
action="store_true",
|
|
dest="print_found",
|
|
default=True,
|
|
help="Output sites where the username was found (also if exported as file).",
|
|
)
|
|
parser.add_argument(
|
|
"--no-color",
|
|
action="store_true",
|
|
dest="no_color",
|
|
default=False,
|
|
help="Don't color terminal output",
|
|
)
|
|
parser.add_argument(
|
|
"username",
|
|
nargs="+",
|
|
metavar="USERNAMES",
|
|
action="store",
|
|
help="One or more usernames to check with social networks. Check similar usernames using {?} (replace to '_', '-', '.').",
|
|
)
|
|
parser.add_argument(
|
|
"--browse",
|
|
"-b",
|
|
action="store_true",
|
|
dest="browse",
|
|
default=False,
|
|
help="Browse to all results on default browser.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--local",
|
|
"-l",
|
|
action="store_true",
|
|
default=False,
|
|
help="Force the use of the local data.json file.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--nsfw",
|
|
action="store_true",
|
|
default=False,
|
|
help="Include checking of NSFW sites from default list.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--no-txt",
|
|
action="store_true",
|
|
dest="no_txt",
|
|
default=False,
|
|
help="Disable creation of a txt file",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--ignore-exclusions",
|
|
action="store_true",
|
|
dest="ignore_exclusions",
|
|
default=False,
|
|
help="Ignore upstream exclusions (may return more false positives)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# If the user presses CTRL-C, exit gracefully without throwing errors
|
|
signal.signal(signal.SIGINT, handler)
|
|
|
|
# Check for newer version of Sherlock. If it exists, let the user know about it
|
|
try:
|
|
latest_release_raw = requests.get(forge_api_latest_release, timeout=10).text
|
|
latest_release_json = json_loads(latest_release_raw)
|
|
latest_remote_tag = latest_release_json["tag_name"]
|
|
|
|
if latest_remote_tag[1:] != __version__:
|
|
print(
|
|
f"Update available! {__version__} --> {latest_remote_tag[1:]}"
|
|
f"\n{latest_release_json['html_url']}"
|
|
)
|
|
|
|
except Exception as error:
|
|
print(f"A problem occurred while checking for an update: {error}")
|
|
|
|
# Argument check
|
|
# TODO regex check on args.proxy
|
|
if args.tor and (args.proxy is not None):
|
|
raise Exception("Tor and Proxy cannot be set at the same time.")
|
|
|
|
# Make prompts
|
|
if args.proxy is not None:
|
|
print("Using the proxy: " + args.proxy)
|
|
|
|
if args.tor or args.unique_tor:
|
|
print("Using Tor to make requests")
|
|
|
|
print(
|
|
"Warning: some websites might refuse connecting over Tor, so note that using this option might increase connection errors."
|
|
)
|
|
|
|
if args.no_color:
|
|
# Disable color output.
|
|
init(strip=True, convert=False)
|
|
else:
|
|
# Enable color output.
|
|
init(autoreset=True)
|
|
|
|
# Check if both output methods are entered as input.
|
|
if args.output is not None and args.folderoutput is not None:
|
|
print("You can only use one of the output methods.")
|
|
sys.exit(1)
|
|
|
|
# Check validity for single username output.
|
|
if args.output is not None and len(args.username) != 1:
|
|
print("You can only use --output with a single username")
|
|
sys.exit(1)
|
|
|
|
# Create object with all information about sites we are aware of.
|
|
try:
|
|
if args.local:
|
|
sites = SitesInformation(
|
|
os.path.join(os.path.dirname(__file__), "resources/data.json"),
|
|
honor_exclusions=False,
|
|
)
|
|
else:
|
|
json_file_location = args.json_file
|
|
if args.json_file:
|
|
# If --json parameter is a number, interpret it as a pull request number
|
|
if args.json_file.isnumeric():
|
|
pull_number = args.json_file
|
|
pull_url = f"https://api.github.com/repos/sherlock-project/sherlock/pulls/{pull_number}"
|
|
pull_request_raw = requests.get(pull_url, timeout=10).text
|
|
pull_request_json = json_loads(pull_request_raw)
|
|
|
|
# Check if it's a valid pull request
|
|
if "message" in pull_request_json:
|
|
print(f"ERROR: Pull request #{pull_number} not found.")
|
|
sys.exit(1)
|
|
|
|
head_commit_sha = pull_request_json["head"]["sha"]
|
|
json_file_location = f"https://raw.githubusercontent.com/sherlock-project/sherlock/{head_commit_sha}/sherlock_project/resources/data.json"
|
|
|
|
sites = SitesInformation(
|
|
data_file_path=json_file_location,
|
|
honor_exclusions=not args.ignore_exclusions,
|
|
do_not_exclude=args.site_list,
|
|
)
|
|
except Exception as error:
|
|
print(f"ERROR: {error}")
|
|
sys.exit(1)
|
|
|
|
if not args.nsfw:
|
|
sites.remove_nsfw_sites(do_not_remove=args.site_list)
|
|
|
|
# Create original dictionary from SitesInformation() object.
|
|
# Eventually, the rest of the code will be updated to use the new object
|
|
# directly, but this will glue the two pieces together.
|
|
site_data_all = {site.name: site.information for site in sites}
|
|
if args.site_list == []:
|
|
# Not desired to look at a sub-set of sites
|
|
site_data = site_data_all
|
|
else:
|
|
# User desires to selectively run queries on a sub-set of the site list.
|
|
# Make sure that the sites are supported & build up pruned site database.
|
|
site_data = {}
|
|
site_missing = []
|
|
for site in args.site_list:
|
|
counter = 0
|
|
for existing_site in site_data_all:
|
|
if site.lower() == existing_site.lower():
|
|
site_data[existing_site] = site_data_all[existing_site]
|
|
counter += 1
|
|
if counter == 0:
|
|
# Build up list of sites not supported for future error message.
|
|
site_missing.append(f"'{site}'")
|
|
|
|
if site_missing:
|
|
print(f"Error: Desired sites not found: {', '.join(site_missing)}.")
|
|
|
|
if not site_data:
|
|
sys.exit(1)
|
|
|
|
# Create notify object for query results.
|
|
query_notify = QueryNotifyPrint(
|
|
result=None, verbose=args.verbose, print_all=args.print_all, browse=args.browse
|
|
)
|
|
|
|
# Run report on all specified users.
|
|
all_usernames = []
|
|
for username in args.username:
|
|
if check_for_parameter(username):
|
|
for name in multiple_usernames(username):
|
|
all_usernames.append(name)
|
|
else:
|
|
all_usernames.append(username)
|
|
for username in all_usernames:
|
|
results = sherlock(
|
|
username,
|
|
site_data,
|
|
query_notify,
|
|
tor=args.tor,
|
|
unique_tor=args.unique_tor,
|
|
dump_response=args.dump_response,
|
|
proxy=args.proxy,
|
|
timeout=args.timeout,
|
|
)
|
|
|
|
if args.output:
|
|
result_file = args.output
|
|
elif args.folderoutput:
|
|
# The usernames results should be stored in a targeted folder.
|
|
# If the folder doesn't exist, create it first
|
|
os.makedirs(args.folderoutput, exist_ok=True)
|
|
result_file = os.path.join(args.folderoutput, f"{username}.txt")
|
|
else:
|
|
result_file = f"{username}.txt"
|
|
|
|
if not args.no_txt:
|
|
with open(result_file, "w", encoding="utf-8") as file:
|
|
exists_counter = 0
|
|
for website_name in results:
|
|
dictionary = results[website_name]
|
|
if dictionary.get("status").status == QueryStatus.CLAIMED:
|
|
exists_counter += 1
|
|
file.write(dictionary["url_user"] + "\n")
|
|
file.write(f"Total Websites Username Detected On : {exists_counter}\n")
|
|
|
|
if args.csv:
|
|
result_file = f"{username}.csv"
|
|
if args.folderoutput:
|
|
# The usernames results should be stored in a targeted folder.
|
|
# If the folder doesn't exist, create it first
|
|
os.makedirs(args.folderoutput, exist_ok=True)
|
|
result_file = os.path.join(args.folderoutput, result_file)
|
|
|
|
with open(result_file, "w", newline="", encoding="utf-8") as csv_report:
|
|
writer = csv.writer(csv_report)
|
|
writer.writerow(
|
|
[
|
|
"username",
|
|
"name",
|
|
"url_main",
|
|
"url_user",
|
|
"exists",
|
|
"http_status",
|
|
"response_time_s",
|
|
]
|
|
)
|
|
for site in results:
|
|
if (
|
|
args.print_found
|
|
and not args.print_all
|
|
and results[site]["status"].status != QueryStatus.CLAIMED
|
|
):
|
|
continue
|
|
|
|
response_time_s = results[site]["status"].query_time
|
|
if response_time_s is None:
|
|
response_time_s = ""
|
|
writer.writerow(
|
|
[
|
|
username,
|
|
site,
|
|
results[site]["url_main"],
|
|
results[site]["url_user"],
|
|
str(results[site]["status"].status),
|
|
results[site]["http_status"],
|
|
response_time_s,
|
|
]
|
|
)
|
|
if args.xlsx:
|
|
usernames = []
|
|
names = []
|
|
url_main = []
|
|
url_user = []
|
|
exists = []
|
|
http_status = []
|
|
response_time_s = []
|
|
|
|
for site in results:
|
|
if (
|
|
args.print_found
|
|
and not args.print_all
|
|
and results[site]["status"].status != QueryStatus.CLAIMED
|
|
):
|
|
continue
|
|
|
|
if response_time_s is None:
|
|
response_time_s.append("")
|
|
else:
|
|
response_time_s.append(results[site]["status"].query_time)
|
|
usernames.append(username)
|
|
names.append(site)
|
|
url_main.append(results[site]["url_main"])
|
|
url_user.append(results[site]["url_user"])
|
|
exists.append(str(results[site]["status"].status))
|
|
http_status.append(results[site]["http_status"])
|
|
|
|
DataFrame = pd.DataFrame(
|
|
{
|
|
"username": usernames,
|
|
"name": names,
|
|
"url_main": url_main,
|
|
"url_user": url_user,
|
|
"exists": exists,
|
|
"http_status": http_status,
|
|
"response_time_s": response_time_s,
|
|
}
|
|
)
|
|
DataFrame.to_excel(f"{username}.xlsx", sheet_name="sheet1", index=False)
|
|
|
|
print()
|
|
query_notify.finish()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|