Renamed Server to Tooter

main
Nicholas Hope 2022-11-25 19:11:10 -05:00
parent 08c88b35e2
commit 5faf193dee
1 changed files with 41 additions and 36 deletions

View File

@ -9,46 +9,50 @@ __version__ = '1.0'
__copyright__ = 'Copyright © 2022 Paco Hope. See LICENSE for details.' __copyright__ = 'Copyright © 2022 Paco Hope. See LICENSE for details.'
from mastodon import Mastodon from mastodon import Mastodon
from dotenv import load_dotenv import toml
import os import os
import time import time
import argparse import argparse
import sys import sys
from pprint import pprint
# import toota-palooza
class Server(Mastodon): class Tooter(Mastodon):
dotenv_loaded = False credentials: dict = {}
hostname: str = ''
def __init__(self): def __init__(self, name: str):
self.maybe_load_dotenv() cred_dict = self.credentials[name]
self.username = cred_dict['addr']
self.client_id = '.toota-palooza.env' self.password = cred_dict['pass']
self.hostname = os.getenv('MD_HOST') self.client_id = f'.toota-palooza.env'
self.username = os.getenv('MD_USER') self.cred_file = f'.toota-palooza-usercred-{name}.env'
self.passwd = os.getenv('MD_PASS')
self.to_file = '.toota-palooza-usercred.env'
super().__init__(client_id=self.client_id, api_base_url=self.hostname) super().__init__(client_id=self.client_id, api_base_url=self.hostname)
self.log_in( self.log_in(
self.username, self.username,
self.passwd, self.password,
to_file=self.to_file to_file=self.cred_file
) )
def post(self, message):
self.status_post(message, visibility='public')
@classmethod @classmethod
def maybe_load_dotenv(cls): def load_credentials(cls, file: str) -> None:
if not cls.dotenv_loaded: as_dict = toml.load(file)
load_dotenv() try:
cls.dotenv_loaded = True cls.hostname = as_dict.pop('host')
except KeyError:
raise KeyError('must provide a hostname')
for username, fields in as_dict.items():
if not isinstance(fields, dict):
raise TypeError(f'{username} has no key/value pairs')
if 'addr' not in fields:
raise KeyError(f'`addr` field missing from {username}')
if 'pass' not in fields:
raise KeyError(f'`pass` field missing from {username}')
cls.credentials = as_dict
def check_public_timeline(server): def check_public_timeline(tooter: Tooter):
"""Do one run. Connect to the database, connect to the server, get the public timeline. """Do one run. Connect to the database, connect to the server, get the public timeline.
Look at users and check to see if any are potential impersonators. Then exit.""" Look at users and check to see if any are potential impersonators. Then exit."""
@ -64,9 +68,9 @@ def check_public_timeline(server):
userid_list = {} userid_list = {}
total = 0 total = 0
calls = 0 calls = 0
domain_def = server.hostname.split('/')[2] domain_def = tooter.hostname.split('/')[2]
while total < max_posts: while total < max_posts:
timeline_list = server.timeline(timeline='public', limit=chunk_size) timeline_list = tooter.timeline(timeline='public', limit=chunk_size)
calls += 1 calls += 1
for post in timeline_list: for post in timeline_list:
userid = post.account.acct userid = post.account.acct
@ -81,28 +85,27 @@ def check_public_timeline(server):
userid_list[userid] = (username, domain, userid_list[userid] = (username, domain,
post.account.display_name, post.account.bot, post.url) post.account.display_name, post.account.bot, post.url)
if len(timeline_list) == 0: posts_indexed = len(timeline_list)
if posts_indexed == 0:
# We got fewer than we asked for. Drop out of the loop. # We got fewer than we asked for. Drop out of the loop.
break break
# record how many we did, and go again. total += posts_indexed
total += len(timeline_list)
timeline_list.clear()
# Ok, we got them all, time to insert # Ok, we got them all, time to insert
return (f'{calls} calls to get {total} posts,' return (f'{calls} calls to get {total} posts,'
+f' {len(userid_list)} users processed') +f' {len(userid_list)} users processed')
def daemon_main(): def daemon_main(tooter: Tooter):
"""Run from a command line.""" """Run from a command line."""
while True: while True:
# do a thing # do a thing
time.sleep(600) time.sleep(600)
def once(server): def once(tooter: Tooter):
"""Run from a command line.""" """Run from a command line."""
message = check_public_timeline(server) message = check_public_timeline(tooter)
server.post(message) tooter.toot(message)
return 0 return 0
def main(): def main():
@ -114,12 +117,14 @@ def main():
help='Run once and exit. Default is to run as a daemon.') help='Run once and exit. Default is to run as a daemon.')
args = parser.parse_args() args = parser.parse_args()
server = Server() Tooter.load_credentials('users.toml')
if args.once: if args.once:
return once(server) t = Tooter('nick')
once(t)
return 0
daemon_main(server) daemon_main(t)
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) sys.exit(main())