utils(scripts): finalize nitter guest account grabber

Changed files
+119 -6
utils
+119 -6
utils/nitter-guest-account.py
···
# thank you!
import sys
import json
from base64 import b64encode
from argparse import ArgumentParser
···
}
# Functions
-
def format_json(msg, object) -> str:
return json.dumps(object, indent=None if noprettyprint else 4)
-
def debug(msg, *arg, **kwarg) -> None:
global verbose
-
if verbose:
print("\x1b[37m[*]", msg, *arg, "\x1b[0m", file=sys.stderr, **kwarg)
def info(msg, *arg, **kwarg) -> None:
···
def error(msg, *arg, **kwarg) -> None:
print("\x1b[31m[x]", msg, *arg, "\x1b[0m", file=sys.stderr, **kwarg)
# Arguments
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', help="be more noisy")
···
def main() -> int:
-
global verbose
args = parser.parse_args()
verbose = args.verbose
info("nitter-guest-account.py (2023-08-25)")
info("This is free software: you are free to change and redistribute it, under the terms of the Apache-2.0 license")
···
success(f'flow token => {flow_token}')
info('Fetching final account object...')
-
tasks = send_req('post', TASKS_ENDPOINT, headers=request_headers, json=get_tasks_body(flow_token))
-
info(tasks.json())
return 0
···
# thank you!
import sys
import json
+
import typing
+
import traceback
from base64 import b64encode
from argparse import ArgumentParser
···
}
# Functions
+
def format_json(object) -> str:
+
global noprettyprint
return json.dumps(object, indent=None if noprettyprint else 4)
+
def debug(msg, *arg, override=False, **kwarg) -> None:
global verbose
+
if verbose or override:
print("\x1b[37m[*]", msg, *arg, "\x1b[0m", file=sys.stderr, **kwarg)
def info(msg, *arg, **kwarg) -> None:
···
def error(msg, *arg, **kwarg) -> None:
print("\x1b[31m[x]", msg, *arg, "\x1b[0m", file=sys.stderr, **kwarg)
+
+
def prompt_bool(msg, default: typing.Optional[bool] = True) -> bool:
+
"""
+
Prompt the user for a y/n value until a valid input is entered.
+
+
:param msg: Message to display
+
:param default: What should the default value be if the user pressed enter. Pass in None to force user to pick one.
+
"""
+
resolved = False
+
p_string = f"({'Y' if default else 'y'}/{'N' if not default else 'n'})" if default is not None else "(y/n)"
+
while not resolved:
+
print("\x1b[35m[?]", msg, f"\x1b[0m{p_string}", file=sys.stderr, end=" ")
+
try:
+
r = input()
+
except EOFError:
+
debug("^D", override=True)
+
debug("gracefully handling EOF")
+
error("invalid input, please try again.")
+
continue
+
+
if default is None and r.strip() == "":
+
error('a response is required. please enter either y or n.')
+
continue
+
if r.strip() == "":
+
return default
+
# if r.strip()[0].lower() not in ['y', 'n']:
+
# error('invalid input. please enter either y or n.')
+
# continue
+
match r.strip()[0].lower():
+
case "y":
+
return True
+
case "n":
+
return False
+
case _:
+
error('invalid input. please enter either y or n.')
+
continue
+
+
# Arguments
parser = ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', help="be more noisy")
···
def main() -> int:
+
global verbose, noprettyprint
args = parser.parse_args()
verbose = args.verbose
+
noprettyprint = args.no_pretty
info("nitter-guest-account.py (2023-08-25)")
info("This is free software: you are free to change and redistribute it, under the terms of the Apache-2.0 license")
···
success(f'flow token => {flow_token}')
info('Fetching final account object...')
+
tasks = send_req('post', TASKS_ENDPOINT, headers=request_headers, json=get_tasks_body(flow_token)).json()
+
try:
+
try:
+
open_account_task = next(filter(lambda i: i.get('subtask_id') == "OpenAccount", tasks['subtasks']))
+
account = open_account_task['open_account']
+
except StopIteration:
+
debug("resulting tasks =>", format_json(tasks), override=True)
+
error("Unable to acquire guest account credentials as it isn't present in the API response.")
+
error("This might be because of a wide variety of reasons, but it most likely is due to your IP being rate-limited.")
+
error("Try again with a new IP address or in 24 hours after this attempt.")
+
return 1
+
+
if args.outfile == "-":
+
debug("outfile is `-`, printing to stdout")
+
print(format_json(account))
+
return 0
+
+
# Sanity checks
+
try:
+
debug(f"attempting to read file: {args.outfile}")
+
with open(args.outfile) as f:
+
old_data = json.load(f)
+
except FileNotFoundError:
+
# that's okay, we might be able to create it later.
+
old_data = []
+
except PermissionError:
+
# that's not okay. we will need to access the file later anyways.
+
error("unable to read file due to a permission error.")
+
error("please make sure this script has read and write access to the file.")
+
print(format_json(account))
+
return 1
+
except json.JSONDecodeError:
+
error("could not parse the provided JSON file.")
+
if not prompt_bool("Do you want to overwrite the file?", default=None):
+
warn("Not overwriting file, printing to stdout instead.")
+
print(format_json(account))
+
return 1
+
debug('assuming old data is an empty array because we are overwriting')
+
old_data = []
+
if type(old_data) != list:
+
error("top-level object of the existing JSON file is not a list.")
+
error("due to the implementation, the file must be overwritten.")
+
if not (prompt_bool("Do you want to overwrite?", default=None)):
+
warn("Not overwriting existing data, printing to stdout instead.")
+
print(format_json(account))
+
return 1
+
debug("assuming old data is an empty array because we are overwriting")
+
old_data = []
+
+
old_data.append(account)
+
+
try:
+
debug("attempting to write file")
+
with open(args.outfile, 'w+') as f:
+
f.write(format_json(old_data))
+
success(f"successfully written to file {args.outfile}")
+
return 0
+
except PermissionError:
+
error("unable to write to file due to permission error.")
+
error("please make sure this script has write access to the file.")
+
print(format_json(account))
+
return 1
+
except Exception as e:
+
error("Unable to write to file due to an uncaught error:", e)
+
tb = ''.join(traceback.TracebackException.from_exception(e).format())
+
debug("exception stacktrace\n" + tb)
+
print(format_json(account))
+
+
except Exception:
+
debug("resulting tasks =>", format_json(tasks), override=True)
+
error("an unhandled error occurred. the tasks object is printed to avoid losing otherwise successful data.")
+
error("please file a bug report and attach the traceback below.")
+
raise
return 0