''' Command line module for calling toota-palooza to do its work ''' __all__ = ['toota-palooza'] __author__ = 'Paco Hope ' __date__ = '25 November 2022' __version__ = '1.0' __copyright__ = 'Copyright © 2022 Paco Hope. See LICENSE for details.' from mastodon import Mastodon from dotenv import load_dotenv import os import time import argparse import sys from pprint import pprint # import toota-palooza class Server(Mastodon): dotenv_loaded = False def __init__(self): self.maybe_load_dotenv() self.client_id = '.toota-palooza.env' self.hostname = os.getenv('MD_HOST') self.username = os.getenv('MD_USER') self.passwd = os.getenv('MD_PASS') self.to_file = '.toota-palooza-usercred.env' super().__init__(client_id=self.client_id, api_base_url=self.hostname) self.log_in( self.username, self.passwd, to_file=self.to_file ) def post(self, message): self.status_post(message, visibility='public') @classmethod def maybe_load_dotenv(cls): if not cls.dotenv_loaded: load_dotenv() cls.dotenv_loaded = True def check_public_timeline(server): """Do one run. Connect to the database, connect to the server, get the public timeline. Look at users and check to see if any are potential impersonators. Then exit.""" # Here's the idea: pick a chunk_size. Ask the server for that many toots off the public # timeline. As long as the server gives us as many as we asked for, keep trying. # as soon as we get less than we asked for, quit. # # XXX Not sure about rate-limiting # chunk_size = 20 max_posts = 1000 timeline_list = [] userid_list = {} total = 0 calls = 0 domain_def = server.hostname.split('/')[2] while total < max_posts: timeline_list = server.timeline(timeline='public', limit=chunk_size) calls += 1 for post in timeline_list: userid = post.account.acct name_and_domain = userid.split('@', 1) username = name_and_domain[0] if len(name_and_domain) == 2: domain = name_and_domain[1] else: # if there is no domain, then it's a local account domain = domain_def userid_list[userid] = (username, domain, post.account.display_name, post.account.bot, post.url) if len(timeline_list) == 0: # We got fewer than we asked for. Drop out of the loop. break # record how many we did, and go again. total += len(timeline_list) timeline_list.clear() # Ok, we got them all, time to insert return (f'{calls} calls to get {total} posts,' +f' {len(userid_list)} users processed') def daemon_main(): """Run from a command line.""" while True: # do a thing time.sleep(600) def once(server): """Run from a command line.""" message = check_public_timeline(server) server.post(message) return 0 def main(): parser = argparse.ArgumentParser( description='Check for suspicious impersonators.') parser.add_argument( '-d', '--debug', action='store_true', help='Enable debugging messages.') parser.add_argument( '-o', '--once', action='store_true', help='Run once and exit. Default is to run as a daemon.') args = parser.parse_args() server = Server() if args.once: return once(server) daemon_main(server) if __name__ == '__main__': sys.exit(main())