''' Command line module for calling tootapalooza to do its work ''' __all__ = ['tootapalooza'] __author__ = 'Paco Hope ' __date__ = '25 November 2022' __version__ = '1.0' __copyright__ = 'Copyright © 2022 Paco Hope. See LICENSE for details.' from mastodon import Mastodon import toml import os import time import argparse import sys from pathlib import Path import random import re args=None class Tooter(Mastodon): credentials: dict = {} hostname : str = '' files : dict = {} client_id : str = '.tootapalooza.env' def __init__(self, name: str): cred_dict = self.credentials[name] self.acct = name self.username = cred_dict['addr'] self.password = cred_dict['pass'] self.displayname = cred_dict['name'] self.cred_file = f'.tootapalooza-usercred-{self.acct}.env' super().__init__(client_id=self.client_id) self.log_in( self.username, self.password, to_file=self.cred_file ) self.id = self.me()['id'] @classmethod def load_credentials(cls, file: str) -> None: as_dict = toml.load(file) for username, fields in as_dict.items(): if not isinstance(fields, dict): raise TypeError(f'{username} has no key/value pairs') if 'addr' not in fields: raise KeyError(f'`addr` field missing from {username}') if 'pass' not in fields: raise KeyError(f'`pass` field missing from {username}') cls.credentials = as_dict @classmethod def load_src_files(cls, dir: Path) -> None: for item in dir.iterdir(): if not item.is_file(): continue with item.open('r') as f: cls.files[f.name] = f.readlines() @classmethod def new_message(cls) -> str: """Choose a message from all the source texts. Returns the message.""" sourcefile = random.choice(list(cls.files.values())) startline = random.randint(0,len(sourcefile)) sourceline = '' i=0 # Starting at a random line, keep adding more lines to my # toot until I get over 400 characters (max is 500 on most # mastodon servers) while(len(sourceline) < 400 and startline+i < len(sourcefile)): sourceline=sourceline+sourcefile[startline+i] i+=1 # Try to find a 400-odd character string that ends in a full stop tootline = re.search( '((\s|\S){,400})\.', sourceline ) if( tootline ): message=tootline.group(0).strip() else: message=sourceline.strip() return(message) def read_timeline(self, *timeline_args, **timeline_kwargs) -> list: """Reads the given timeline (just a wrapper for self.timeline()) but also updates the read marker for the timeline.""" result = self.timeline(*timeline_args, **timeline_kwargs) # try to get timeline from kwargs. if not passed as keyword, try to # get as positional. if no positional args were passed, try to get # as keyword again. try: timeline_name = timeline_kwargs.get('timeline',timeline_args[0]) except IndexError: timeline_name = timeline_kwargs['timeline'] if( len(result) > 0 and not args.dry_run ): self.markers_set([timeline_name], [result[-1].id]) return result def toot_tagged_public(self) -> int: """Get a random message, pick a small number of words in it to hashtag. Then toot it publicly.""" if( args.debug ): print(f"{self.acct} toot_tagged_public") rawmessage = self.new_message().split() numtaggable = sum(1 for word in rawmessage if len(word) > 3) # Pick a small number of words to tag, but not greater than the # total number of words there are, nor the total number of taggable # words. tagstodo = min(random.randint(1,6), len(rawmessage), numtaggable) while( tagstodo > 0 ): wordix = random.randrange(0, len(rawmessage)) if( len(rawmessage[wordix]) > 3 and rawmessage[wordix][0] != '#' ): rawmessage[wordix] = '#' + rawmessage[wordix] tagstodo -=1 message = ' '.join(rawmessage) if( args.dry_run ): print(f"tagged toot message: \"{message}\"") else: if( args.debug ): print(f"{self.acct} tagged toots \"{message}\"") self.toot(message) return 0 def toot_plain_public(self) -> int: """Toot a random message.""" if( args.debug ): print(f"{self.acct} toot_plain_public") message = self.new_message() if( args.dry_run ): print(f"toot message: \"{message}\"") else: if( args.debug ): print(f"{self.acct} toots \"{message}\"") self.toot(message) return 0 def toot_plain_unlisted(self) -> int: """Toot a random message unlisted.""" message = self.new_message() if( args.dry_run ): print(f"toot unlisted message: \"{message}\"") else: if( args.debug ): print(f"{self.acct} toots unlisted \"{message}\"") self.status_post( message, visibility='private' ) return( 0 ) def reply_random_local(self) -> int: if( args.debug ): print(f"{self.acct} reply_random_local") return(self._reply_random('local')) def reply_random_home(self) -> int: if( args.debug ): print(f"{self.acct} reply_random_home") return(self._reply_random('home')) def reply_random_public(self) -> int: if( args.debug ): print(f"{self.acct} reply_random_public") return(self._reply_random('public')) def _reply_random(self, timeline: str) -> int: """Read the given timeline, pick a post at random, reply to it.""" chunk_size = 20 max_posts = 100 timeline_list = [] while( len(timeline_list) < max_posts ): posts_received = self.read_timeline(timeline=timeline, limit=chunk_size) if( len(posts_received) == 0 ): break timeline_list.extend(posts_received) reply_post = random.choice(timeline_list) message = self.new_message() if( args.dry_run ): print(f"{self.acct} replied to {timeline} {reply_post.id}") else: if( args.debug ): print(f"{self.acct} replied to {reply_post.account.acct} {reply_post.uri}") self.status_post( message, in_reply_to_id=reply_post.id, visibility='public' ) def follow_random_local(self) -> int: if( args.debug ): print(f"{self.acct} follow_random_local") followlist = self.account_following(id=self.id) followed_people = {account.acct for account in followlist} # This does outersection on sets. It's the set of all users we know about # (from the users.toml file) minus ourselves and anyone we already follow potentials = set(self.credentials) ^ {self.acct} ^ followed_people follow_target = random.choice(list(potentials)) target_dict = self.account_lookup(follow_target) if( args.dry_run ): print(f"{self.acct} will follow {follow_target} (id: {target_dict.id})") else: self.account_follow(target_dict.id) return( 0 ) def unfollow_random(self) -> int: if( args.debug ): print(f"{self.acct} unfollow_random") followlist = self.account_following(id=self.id) unfollowee = random.choice(followlist) if( len(followlist) == 0 ): if( args.debug ): print(f"{self.acct} can't unfollow! Not following anyone.") return( 0 ) if( args.dry_run ): print(f"{self.acct} unfollows {unfollowee.acct} (id: {unfollowee.id})") else: self.account_unfollow(unfollowee.id) return( 0 ) def boost_random_local(self) -> int: if( args.debug ): print(f"{self.acct} boost_random_local") return( 0 ) def favourite_random_local(self) -> int: if( args.debug ): print(f"{self.acct} favourite_random_local") return( self._favourite( timeline='local') ) def favourite_random_public(self) -> int: if( args.debug ): print(f"{self.acct} favourite_random_public") return( self._favourite( timeline='public') ) def favourite_random_home(self) -> int: if( args.debug ): print(f"{self.acct} favourite_random_home") return( self._favourite( timeline='home') ) def _favourite(self, timeline: str) -> int: chunk_size = 20 timeline_list = self.read_timeline(timeline=timeline, limit=chunk_size) if( len(timeline_list) == 0 ): return( 1 ) fav_post = random.choice(timeline_list) if( args.dry_run): print(f"{self.acct} favourites {fav_post.id}") else: if( args.debug ): print(f"{self.acct} favourites {fav_post.id}") self.status_favourite(id=fav_post.id) return( 0 ) def boost_random_local(self) -> int: if( args.debug ): print(f"{self.acct} boost_random_local") return( self._boost( timeline='local') ) def boost_random_public(self) -> int: if( args.debug ): print(f"{self.acct} boost_random_public") return( self._boost( timeline='public') ) def boost_random_home(self) -> int: if( args.debug ): print(f"{self.acct} boost_random_home") return( self._boost( timeline='home') ) def _boost(self, timeline: str) -> int: chunk_size = 20 timeline_list = self.read_timeline(timeline=timeline, limit=chunk_size) if( len(timeline_list) == 0 ): return( 1 ) fav_post = random.choice(timeline_list) if( args.dry_run): print(f"{self.acct} boosts {fav_post.id}") else: if( args.debug ): print(f"{self.acct} boosts {fav_post.id}") self.status_reblog(id=fav_post.id) return( 0 ) def report_random_local(self) -> int: if( args.debug ): print(f"{self.acct} report_random_local") return( 0 ) def random_interaction(self): """Choose one possible interaction according to the weights, and do it.""" interactions = { self.reply_random_local: 4, self.reply_random_home: 4, self.reply_random_public: 4, self.follow_random_local: 2, self.unfollow_random: 1, self.toot_plain_public: 1, self.toot_tagged_public: 4, self.toot_plain_unlisted: 1, self.favourite_random_local: 2, self.favourite_random_home: 2, self.favourite_random_public: 2, self.boost_random_local: 2, self.boost_random_home: 2, self.boost_random_public: 2, self.report_random_local: 0 } chosen = random.choices(population=list(interactions.keys()), weights=list(interactions.values()))[0] chosen() def daemon_main(tooter: Tooter): """Run from a command line.""" while True: # do a thing time.sleep(600) def main(): global args parser = argparse.ArgumentParser( description='Randomly interact with a Mastodon timeline.') parser.add_argument( '-d', '--debug', action='store_true', help='Enable debugging messages.') parser.add_argument( '-o', '--once', action='store_true', help='Run once and exit. Default is to run as a daemon.') parser.add_argument( '-n', '--dry-run', action='store_true', help='Don\'t toot. Just show what would be done.') parser.add_argument( '-D', '--directory', default='text', help='Directory with text source files for toot bodies.') parser.add_argument( 'file', type=argparse.FileType('r'), help='TOML file with user credentials (see server-util/README.md).') args = parser.parse_args() p = Path(args.directory) if not p.exists(): print(f'{sys.argv[0]}: {args.directory}: No such file or directory', file=sys.stderr) return 2 if not p.is_dir(): print(f'{sys.argv[0]}: {args.directory}: Is not a directory', file=sys.stderr) return 2 Tooter.load_src_files(p) Tooter.load_credentials(args.file) if args.once: for name in Tooter.credentials: t = Tooter(name) t.random_interaction() return 0 daemon_main(t) if __name__ == '__main__': sys.exit(main())