''' Command line module for calling toota-palooza to do its work ''' __all__ = ['toota-palooza'] __author__ = 'Paco Hope ' __date__ = '25 November 2022' __version__ = '1.0' __copyright__ = 'Copyright © 2022 Paco Hope. See LICENSE for details.' from mastodon import Mastodon import toml import os import time import argparse import sys from pathlib import Path import random class Tooter(Mastodon): credentials: dict = {} hostname: str = '' files: dict = {} client_id: str = '.toota-palooza.env' def __init__(self, name: str): self.name = name cred_dict = self.credentials[self.name] self.username = cred_dict['addr'] self.password = cred_dict['pass'] self.cred_file = f'.toota-palooza-usercred-{self.name}.env' super().__init__(client_id=self.client_id, api_base_url=self.hostname) self.log_in( self.username, self.password, to_file=self.cred_file ) @classmethod def load_credentials(cls, file: str) -> None: as_dict = toml.load(file) try: cls.hostname = as_dict.pop('host') except KeyError: raise KeyError('must provide a hostname') for username, fields in as_dict.items(): if not isinstance(fields, dict): raise TypeError(f'{username} has no key/value pairs') if 'addr' not in fields: raise KeyError(f'`addr` field missing from {username}') if 'pass' not in fields: raise KeyError(f'`pass` field missing from {username}') cls.credentials = as_dict @classmethod def load_src_files(cls, dir: Path) -> None: for item in dir.iterdir(): if not item.is_file(): continue with item.open('r') as f: cls.files[f.name] = f.readlines() def check_public_timeline(tooter: Tooter): """Do one run. Connect to the database, connect to the server, get the public timeline. Look at users and check to see if any are potential impersonators. Then exit.""" # Here's the idea: pick a chunk_size. Ask the server for that many toots off the public # timeline. As long as the server gives us as many as we asked for, keep trying. # as soon as we get less than we asked for, quit. # # XXX Not sure about rate-limiting # chunk_size = 20 max_posts = 1000 timeline_list = [] userid_list = {} total = 0 calls = 0 domain_def = tooter.hostname.split('/')[2] while total < max_posts: timeline_list = tooter.timeline(timeline='public', limit=chunk_size) calls += 1 for post in timeline_list: userid = post.account.acct name_and_domain = userid.split('@', 1) username = name_and_domain[0] if len(name_and_domain) == 2: domain = name_and_domain[1] else: # if there is no domain, then it's a local account domain = domain_def userid_list[userid] = (username, domain, post.account.display_name, post.account.bot, post.url) posts_indexed = len(timeline_list) if posts_indexed == 0: # We got fewer than we asked for. Drop out of the loop. break total += posts_indexed # Ok, we got them all, time to insert return (f'{calls} calls to get {total} posts,' +f' {len(userid_list)} users processed') def daemon_main(tooter: Tooter): """Run from a command line.""" while True: # do a thing time.sleep(600) def once(tooter: Tooter): """Run from a command line.""" # message = check_public_timeline(tooter) message = f'{tooter.name} says hi!' tooter.toot( # random sentence... random.choice( # from a random line... random.choice( # from a random file... random.choice( list(tooter.files.values()) ) ) # stripped and split on full stops... .strip().split('.') ) # and of weird punctuation .strip() ) return 0 def main(): parser = argparse.ArgumentParser( description='Randomly interact with a Mastodon timeline.') parser.add_argument( '-d', '--debug', action='store_true', help='Enable debugging messages.') parser.add_argument( '-o', '--once', action='store_true', help='Run once and exit. Default is to run as a daemon.') parser.add_argument( '-D', '--directory', default='text', help='Change directory to source files from.') parser.add_argument( 'file', type=argparse.FileType('r'), help='Change file to source users from.') args = parser.parse_args() p = Path(args.directory) if not p.exists(): print(f'{sys.argv[0]}: {args.directory}: No such file or directory', file=sys.stderr) return 2 if not p.is_dir(): print(f'{sys.argv[0]}: {args.directory}: Is not a directory', file=sys.stderr) return 2 Tooter.load_src_files(p) Tooter.load_credentials(args.file) if args.once: for name in Tooter.credentials: t = Tooter(name) once(t) return 0 daemon_main(t) if __name__ == '__main__': sys.exit(main())