tootapalooza/tootapalooza/cli.py

395 lines
14 KiB
Python

'''
Command line module for calling tootapalooza to do its work
'''
__all__ = ['tootapalooza']
__author__ = 'Paco Hope <tootapalooza@filter.paco.to>'
__date__ = '25 November 2022'
__version__ = '1.0'
__copyright__ = 'Copyright © 2022 Paco Hope. See LICENSE for details.'
import mastodon
import toml
import time
import argparse
import sys
from pathlib import Path
import random
import re
import logging
from tootapalooza.userdata.names import usernames
args=None
logger=None
class Tooter(mastodon.Mastodon):
credentials: dict = {}
hostname : str = ''
files : dict = {}
client_id : str = '.tootapalooza.env'
def __init__(self, name: str):
cred_dict = self.credentials[name]
self.acct = name
self.username = cred_dict['addr']
self.password = cred_dict['pass']
self.displayname = cred_dict['name']
self.cred_file = f'.tootapalooza-usercred-{self.acct}.env'
super().__init__(access_token = self.cred_file,
client_id=self.client_id,
user_agent = "tootapalooza")
# Try a basic call to ensure we are logged in
try:
account = self.me()
except mastodon.errors.MastodonUnauthorizedError:
logger.warn(f"Warning {self.acct} not logged in. Logging in.")
self.revoke_access_token()
try:
self.log_in(
self.username,
self.password,
to_file=self.cred_file
)
except Exception as e:
logger.critical( f"Failed to login as {self.acct}" )
return None
# if logging in worked, this will work
account = self.me()
if( account['display_name'] == '' ):
logger.debug(f"{self.acct} setting display_name for first time" )
self.account_update_credentials(display_name=self.displayname)
if( account['note'] == '' ):
logger.debug(f"{self.acct} setting bio for first time" )
self.set_bio()
self.id = account['id']
@classmethod
def load_credentials(cls, file: str) -> None:
as_dict = toml.load(file)
for username, fields in as_dict.items():
if not isinstance(fields, dict):
raise TypeError(f'{username} has no key/value pairs')
if 'addr' not in fields:
raise KeyError(f'`addr` field missing from {username}')
if 'pass' not in fields:
raise KeyError(f'`pass` field missing from {username}')
cls.credentials = as_dict
@classmethod
def load_src_files(cls, dir: Path) -> None:
for item in dir.iterdir():
if not item.is_file():
continue
with item.open('r') as f:
cls.files[f.name] = f.readlines()
@classmethod
def new_message(cls) -> str:
"""Choose a message from all the source texts. Returns the message."""
sourcefile = random.choice(list(cls.files.values()))
startline = random.randint(0,len(sourcefile))
sourceline = ''
i=0
# Starting at a random line, keep adding more lines to my
# toot until I get over 400 characters (max is 500 on most
# mastodon servers)
while(len(sourceline) < 400 and startline+i < len(sourcefile)):
sourceline=sourceline+sourcefile[startline+i]
i+=1
# Try to find a 400-odd character string that ends in a full stop
tootline = re.search( '((\s|\S){,400})\.', sourceline )
if( tootline ):
message=tootline.group(0).strip()
else:
message=sourceline.strip()
return(message)
def read_timeline(self, *timeline_args, **timeline_kwargs) -> list:
"""Reads the given timeline (just a wrapper for self.timeline()) but
also updates the read marker for the timeline."""
result = self.timeline(*timeline_args, **timeline_kwargs)
# try to get timeline from kwargs. if not passed as keyword, try to
# get as positional. if no positional args were passed, try to get
# as keyword again.
try:
timeline_name = timeline_kwargs.get('timeline',timeline_args[0])
except IndexError:
timeline_name = timeline_kwargs['timeline']
if( len(result) > 0 and not args.dry_run ):
self.markers_set([timeline_name], [result[-1].id])
return result
def toot_tagged_public(self) -> int:
"""Get a random message, pick a small number of words in it to
hashtag. Then toot it publicly."""
logger.debug(f"{self.acct} toot_tagged_public")
rawmessage = self.new_message().split()
numtaggable = sum(1 for word in rawmessage if len(word) > 3)
# Pick a small number of words to tag, but not greater than the
# total number of words there are, nor the total number of taggable
# words.
tagstodo = min(random.randint(1,6), len(rawmessage), numtaggable)
while( tagstodo > 0 ):
wordix = random.randrange(0, len(rawmessage))
if( len(rawmessage[wordix]) > 3 and rawmessage[wordix][0] != '#' ):
rawmessage[wordix] = '#' + rawmessage[wordix]
tagstodo -=1
message = ' '.join(rawmessage)
if( args.dry_run ):
logger.info(f"tagged toot message: \"{message}\"")
else:
logger.debug(f"{self.acct} tagged toots \"{message}\"")
self.toot(message)
return 0
def toot_plain_public(self) -> int:
"""Toot a random message."""
logger.debug(f"{self.acct} toot_plain_public")
message = self.new_message()
if( args.dry_run ):
logger.info(f"toot message: \"{message}\"")
else:
logger.debug(f"{self.acct} toots \"{message}\"")
self.toot(message)
return 0
def toot_plain_unlisted(self) -> int:
"""Toot a random message unlisted."""
message = self.new_message()
if( args.dry_run ):
logger.info(f"toot unlisted message: \"{message}\"")
else:
logger.debug(f"{self.acct} toots unlisted \"{message}\"")
self.status_post( message, visibility='private' )
return( 0 )
def reply_random_local(self) -> int:
logger.debug(f"{self.acct} reply_random_local")
return(self._reply_random('local'))
def reply_random_home(self) -> int:
logger.debug(f"{self.acct} reply_random_home")
return(self._reply_random('home'))
def reply_random_public(self) -> int:
logger.debug(f"{self.acct} reply_random_public")
return(self._reply_random('public'))
def _reply_random(self, timeline: str) -> int:
"""Read the given timeline, pick a post at random, reply to it."""
chunk_size = 20
max_posts = 100
timeline_list = []
while( len(timeline_list) < max_posts ):
posts_received = self.read_timeline(timeline=timeline, limit=chunk_size)
if( len(posts_received) == 0 ):
break
timeline_list.extend(posts_received)
reply_post = random.choice(timeline_list)
message = self.new_message()
if( args.dry_run ):
logger.info(f"{self.acct} replied to {timeline} {reply_post.id}")
else:
logger.debug(f"{self.acct} replied to {reply_post.account.acct} {reply_post.uri}")
self.status_post( message, in_reply_to_id=reply_post.id, visibility='public' )
def follow_random_local(self) -> int:
logger.debug(f"{self.acct} follow_random_local")
followlist = self.account_following(id=self.id)
followed_people = {account.acct for account in followlist}
# This does outersection on sets. It's the set of all users we know about
# (from the users.toml file) minus ourselves and anyone we already follow
potentials = set(self.credentials) ^ {self.acct} ^ followed_people
follow_target = random.choice(list(potentials))
target_dict = self.account_lookup(follow_target)
if( args.dry_run ):
logger.info(f"{self.acct} will follow {follow_target} (id: {target_dict.id})")
else:
self.account_follow(target_dict.id)
return( 0 )
def unfollow_random(self) -> int:
logger.debug(f"{self.acct} unfollow_random")
followlist = self.account_following(id=self.id)
unfollowee = random.choice(followlist)
if( len(followlist) == 0 ):
logger.debug(f"{self.acct} can't unfollow! Not following anyone.")
return( 0 )
if( args.dry_run ):
logger.info(f"{self.acct} unfollows {unfollowee.acct} (id: {unfollowee.id})")
else:
self.account_unfollow(unfollowee.id)
return( 0 )
def boost_random_local(self) -> int:
logger.debug(f"{self.acct} boost_random_local")
return( 0 )
def favourite_random_local(self) -> int:
logger.debug(f"{self.acct} favourite_random_local")
return( self._favourite( timeline='local') )
def favourite_random_public(self) -> int:
logger.debug(f"{self.acct} favourite_random_public")
return( self._favourite( timeline='public') )
def favourite_random_home(self) -> int:
logger.debug(f"{self.acct} favourite_random_home")
return( self._favourite( timeline='home') )
def _favourite(self, timeline: str) -> int:
chunk_size = 20
timeline_list = self.read_timeline(timeline=timeline, limit=chunk_size)
if( len(timeline_list) == 0 ):
return( 1 )
fav_post = random.choice(timeline_list)
if( args.dry_run):
logger.info(f"{self.acct} favourites {fav_post.id}")
else:
logger.debug(f"{self.acct} favourites {fav_post.id}")
self.status_favourite(id=fav_post.id)
return( 0 )
def boost_random_local(self) -> int:
logger.debug(f"{self.acct} boost_random_local")
return( self._boost( timeline='local') )
def boost_random_public(self) -> int:
logger.debug(f"{self.acct} boost_random_public")
return( self._boost( timeline='public') )
def boost_random_home(self) -> int:
logger.debug(f"{self.acct} boost_random_home")
return( self._boost( timeline='home') )
def _boost(self, timeline: str) -> int:
chunk_size = 20
timeline_list = self.read_timeline(timeline=timeline, limit=chunk_size)
if( len(timeline_list) == 0 ):
return( 1 )
fav_post = random.choice(timeline_list)
if( args.dry_run):
logger.info(f"{self.acct} boosts {fav_post.id}")
else:
logger.debug(f"{self.acct} boosts {fav_post.id}")
self.status_reblog(id=fav_post.id)
return( 0 )
def report_random_local(self) -> int:
logger.debug(f"{self.acct} report_random_local")
return( 0 )
def set_display_name(self) -> int:
newname = random.choice(usernames)
logger.debug(f"{self.acct} set_display_name to {newname}")
self.account_update_credentials(display_name=newname)
return( 0 )
def set_bio(self) -> int:
newbio = self.new_message()
logger.debug(f"{self.acct} set_bio to {len(newbio)}-character bio")
self.account_update_credentials(note=newbio)
return( 0 )
def random_interaction(self):
"""Choose one possible interaction according to the weights, and do it."""
interactions = {
self.reply_random_local: 2,
self.reply_random_home: 2,
self.reply_random_public: 2,
self.follow_random_local: 2,
self.unfollow_random: 1,
self.toot_plain_public: 1,
self.toot_tagged_public: 4,
self.toot_plain_unlisted: 1,
self.favourite_random_local: 2,
self.favourite_random_home: 2,
self.favourite_random_public: 2,
self.boost_random_local: 2,
self.boost_random_home: 2,
self.boost_random_public: 2,
self.set_display_name: 1,
self.set_bio: 1,
self.report_random_local: 1
}
chosen = random.choices(population=list(interactions.keys()),
weights=list(interactions.values()))[0]
chosen()
def daemon_main(tooter: Tooter):
"""Run from a command line."""
while True:
# do a thing
time.sleep(600)
def main():
global args
global logger
parser = argparse.ArgumentParser(
description='Randomly interact with a Mastodon timeline.')
parser.add_argument( '-d', '--debug', action='store_true',
help='Enable debugging messages.')
parser.add_argument( '-o', '--once', action='store_true',
help='Run once and exit. Default is to run as a daemon.')
parser.add_argument( '-n', '--dry-run', action='store_true',
help='Don\'t toot. Just show what would be done.')
parser.add_argument( '-D', '--directory', default='text',
help='Directory with text source files for toot bodies.')
parser.add_argument( 'file', type=argparse.FileType('r'),
help='TOML file with user credentials (see server-util/README.md).')
args = parser.parse_args()
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(levelname)s\t%(message)s')
if( args.debug ):
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.ERROR)
p = Path(args.directory)
if not p.exists():
logger.error(f'{sys.argv[0]}: {args.directory}: No such file or directory')
return 2
if not p.is_dir():
logger.error(f'{sys.argv[0]}: {args.directory}: Is not a directory')
return 2
Tooter.load_src_files(p)
Tooter.load_credentials(args.file)
if args.once:
for name in Tooter.credentials:
t = Tooter(name)
t.random_interaction()
return 0
daemon_main(t)
if __name__ == '__main__':
sys.exit(main())