diff --git a/toota-palooza/cli.py b/toota-palooza/cli.py index 8c9ca2e..e12b95e 100644 --- a/toota-palooza/cli.py +++ b/toota-palooza/cli.py @@ -18,94 +18,108 @@ from pprint import pprint # import toota-palooza -def mastodonInit(): - """Connect to the Mastodon instance based on .env files""" +class Server(Mastodon): + dotenv_loaded = False - server = Mastodon( - client_id = '.toota-palooza.env', - api_base_url = os.getenv('MD_HOST') - ) + def __init__(self): + self.maybe_load_dotenv() - load_dotenv() - server.log_in( - os.getenv('MD_USER'), - os.getenv('MD_PASS'), - to_file = '.toota-palooza-usercred.env' - ) - return(server) + self.client_id = '.toota-palooza.env' + self.hostname = os.getenv('MD_HOST') + self.username = os.getenv('MD_USER') + self.passwd = os.getenv('MD_PASS') + self.to_file = '.toota-palooza-usercred.env' + + super().__init__(client_id=self.client_id, api_base_url=self.hostname) + + self.log_in( + self.username, + self.passwd, + to_file=self.to_file + ) + + def post(self, message): + self.status_post(message, visibility='public') + + @classmethod + def maybe_load_dotenv(cls): + if not cls.dotenv_loaded: + load_dotenv() + cls.dotenv_loaded = True -def checkPublicTimeline( server): +def check_public_timeline(server): """Do one run. Connect to the database, connect to the server, get the public timeline. Look at users and check to see if any are potential impersonators. Then exit.""" - # Here's the idea: pick a chunkSize. Ask the server for that many toots off the public + # Here's the idea: pick a chunk_size. Ask the server for that many toots off the public # timeline. As long as the server gives us as many as we asked for, keep trying. # as soon as we get less than we asked for, quit. # # XXX Not sure about rate-limiting # - chunkSize = 20 - maxPosts = 1000 - timelineList = [] - useridList = {} - total = 0 - calls = 0 - while( total < maxPosts ): - timelineList = server.timeline(timeline='public', since_id=latestId, limit=chunkSize) - calls = calls + 1 - for post in timelineList: + chunk_size = 20 + max_posts = 1000 + timeline_list = [] + userid_list = {} + total = 0 + calls = 0 + domain_def = server.hostname.split('/')[2] + while total < max_posts: + timeline_list = server.timeline(timeline='public', limit=chunk_size) + calls += 1 + for post in timeline_list: userid = post.account.acct - username=userid.split('@')[0] + name_and_domain = userid.split('@', 1) + username = name_and_domain[0] - try: - domain=userid.split('@')[1] - except IndexError: + if len(name_and_domain) == 2: + domain = name_and_domain[1] + else: # if there is no domain, then it's a local account - domain=os.getenv('MD_HOST').split('/')[2] - useridList[userid] = (username,domain,post.account.display_name,post.account.bot,post.url) - latestId=post.id + domain = domain_def + userid_list[userid] = (username, domain, + post.account.display_name, post.account.bot, post.url) - if( len(timelineList) < 1): + if len(timeline_list) == 0: # We got fewer than we asked for. Drop out of the loop. - total = total + len(timelineList) break - else: - # record how many we did, and go again. - total = total + len(timelineList) - timelineList = [] + # record how many we did, and go again. + total += len(timeline_list) + timeline_list.clear() # Ok, we got them all, time to insert - server.status_post( "{} calls to get {} posts, {} users processed".format(calls, total, len(useridList)), - visibility='public' ) + return (f'{calls} calls to get {total} posts,' + +f' {len(userid_list)} users processed') def daemon_main(): """Run from a command line.""" - server = mastodonInit() - while(True): + while True: # do a thing time.sleep(600) -def once(): +def once(server): """Run from a command line.""" - server = mastodonInit() + message = check_public_timeline(server) + server.post(message) return 0 def main(): parser = argparse.ArgumentParser( - description='Check for suspicious impersonators.' - ) + description='Check for suspicious impersonators.') parser.add_argument( '-d', '--debug', action='store_true', help='Enable debugging messages.') parser.add_argument( '-o', '--once', action='store_true', help='Run once and exit. Default is to run as a daemon.') args = parser.parse_args() - if( args.once ): - exit(once()) - else: - daemon_main() + server = Server() + + if args.once: + return once(server) + + daemon_main(server) if __name__ == '__main__': - exit(once()) \ No newline at end of file + sys.exit(main()) \ No newline at end of file