2022-11-25 14:03:24 -05:00
|
|
|
'''
|
2022-11-27 09:09:18 -05:00
|
|
|
Command line module for calling tootapalooza to do its work
|
2022-11-25 14:03:24 -05:00
|
|
|
'''
|
|
|
|
|
2022-11-27 09:09:18 -05:00
|
|
|
__all__ = ['tootapalooza']
|
|
|
|
__author__ = 'Paco Hope <tootapalooza@filter.paco.to>'
|
2022-11-25 14:03:24 -05:00
|
|
|
__date__ = '25 November 2022'
|
|
|
|
__version__ = '1.0'
|
|
|
|
__copyright__ = 'Copyright © 2022 Paco Hope. See LICENSE for details.'
|
|
|
|
|
|
|
|
from mastodon import Mastodon
|
2022-11-25 19:11:10 -05:00
|
|
|
import toml
|
2022-11-25 14:03:24 -05:00
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import argparse
|
|
|
|
import sys
|
2022-11-26 00:09:23 -05:00
|
|
|
from pathlib import Path
|
|
|
|
import random
|
2022-11-25 14:03:24 -05:00
|
|
|
|
2022-11-25 19:11:10 -05:00
|
|
|
class Tooter(Mastodon):
|
|
|
|
credentials: dict = {}
|
|
|
|
hostname: str = ''
|
2022-11-26 00:09:23 -05:00
|
|
|
files: dict = {}
|
2022-11-27 09:09:18 -05:00
|
|
|
client_id: str = '.tootapalooza.env'
|
2022-11-25 14:03:24 -05:00
|
|
|
|
2022-11-25 19:11:10 -05:00
|
|
|
def __init__(self, name: str):
|
2022-11-25 19:58:19 -05:00
|
|
|
self.name = name
|
|
|
|
cred_dict = self.credentials[self.name]
|
2022-11-25 19:11:10 -05:00
|
|
|
self.username = cred_dict['addr']
|
|
|
|
self.password = cred_dict['pass']
|
2022-11-27 09:09:18 -05:00
|
|
|
self.cred_file = f'.tootapalooza-usercred-{self.name}.env'
|
2022-11-25 14:03:24 -05:00
|
|
|
|
2022-11-25 16:06:03 -05:00
|
|
|
super().__init__(client_id=self.client_id, api_base_url=self.hostname)
|
2022-11-25 14:03:24 -05:00
|
|
|
|
2022-11-25 16:06:03 -05:00
|
|
|
self.log_in(
|
|
|
|
self.username,
|
2022-11-25 19:11:10 -05:00
|
|
|
self.password,
|
|
|
|
to_file=self.cred_file
|
2022-11-25 16:06:03 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
2022-11-25 19:11:10 -05:00
|
|
|
def load_credentials(cls, file: str) -> None:
|
|
|
|
as_dict = toml.load(file)
|
|
|
|
try:
|
|
|
|
cls.hostname = as_dict.pop('host')
|
|
|
|
except KeyError:
|
|
|
|
raise KeyError('must provide a hostname')
|
|
|
|
for username, fields in as_dict.items():
|
|
|
|
if not isinstance(fields, dict):
|
|
|
|
raise TypeError(f'{username} has no key/value pairs')
|
|
|
|
if 'addr' not in fields:
|
|
|
|
raise KeyError(f'`addr` field missing from {username}')
|
|
|
|
if 'pass' not in fields:
|
|
|
|
raise KeyError(f'`pass` field missing from {username}')
|
|
|
|
cls.credentials = as_dict
|
|
|
|
|
2022-11-26 00:09:23 -05:00
|
|
|
@classmethod
|
|
|
|
def load_src_files(cls, dir: Path) -> None:
|
|
|
|
for item in dir.iterdir():
|
|
|
|
if not item.is_file():
|
|
|
|
continue
|
|
|
|
with item.open('r') as f:
|
|
|
|
cls.files[f.name] = f.readlines()
|
|
|
|
|
2022-11-25 19:11:10 -05:00
|
|
|
|
|
|
|
def check_public_timeline(tooter: Tooter):
|
2022-11-25 14:03:24 -05:00
|
|
|
"""Do one run. Connect to the database, connect to the server, get the public timeline.
|
|
|
|
Look at users and check to see if any are potential impersonators. Then exit."""
|
|
|
|
|
2022-11-25 16:06:03 -05:00
|
|
|
# Here's the idea: pick a chunk_size. Ask the server for that many toots off the public
|
2022-11-25 14:03:24 -05:00
|
|
|
# timeline. As long as the server gives us as many as we asked for, keep trying.
|
|
|
|
# as soon as we get less than we asked for, quit.
|
|
|
|
#
|
|
|
|
# XXX Not sure about rate-limiting
|
|
|
|
#
|
2022-11-25 16:06:03 -05:00
|
|
|
chunk_size = 20
|
|
|
|
max_posts = 1000
|
|
|
|
timeline_list = []
|
|
|
|
userid_list = {}
|
|
|
|
total = 0
|
|
|
|
calls = 0
|
2022-11-25 19:11:10 -05:00
|
|
|
domain_def = tooter.hostname.split('/')[2]
|
2022-11-25 16:06:03 -05:00
|
|
|
while total < max_posts:
|
2022-11-25 19:11:10 -05:00
|
|
|
timeline_list = tooter.timeline(timeline='public', limit=chunk_size)
|
2022-11-25 16:06:03 -05:00
|
|
|
calls += 1
|
|
|
|
for post in timeline_list:
|
2022-11-25 14:03:24 -05:00
|
|
|
userid = post.account.acct
|
2022-11-25 16:06:03 -05:00
|
|
|
name_and_domain = userid.split('@', 1)
|
|
|
|
username = name_and_domain[0]
|
2022-11-25 14:03:24 -05:00
|
|
|
|
2022-11-25 16:06:03 -05:00
|
|
|
if len(name_and_domain) == 2:
|
|
|
|
domain = name_and_domain[1]
|
|
|
|
else:
|
2022-11-25 14:03:24 -05:00
|
|
|
# if there is no domain, then it's a local account
|
2022-11-25 16:06:03 -05:00
|
|
|
domain = domain_def
|
|
|
|
userid_list[userid] = (username, domain,
|
|
|
|
post.account.display_name, post.account.bot, post.url)
|
2022-11-25 14:03:24 -05:00
|
|
|
|
2022-11-25 19:11:10 -05:00
|
|
|
posts_indexed = len(timeline_list)
|
|
|
|
if posts_indexed == 0:
|
2022-11-25 14:03:24 -05:00
|
|
|
# We got fewer than we asked for. Drop out of the loop.
|
|
|
|
break
|
2022-11-25 19:11:10 -05:00
|
|
|
total += posts_indexed
|
2022-11-25 14:03:24 -05:00
|
|
|
|
2022-11-25 14:54:07 -05:00
|
|
|
# Ok, we got them all, time to insert
|
2022-11-25 16:06:03 -05:00
|
|
|
return (f'{calls} calls to get {total} posts,'
|
|
|
|
+f' {len(userid_list)} users processed')
|
2022-11-25 14:54:07 -05:00
|
|
|
|
2022-11-25 19:11:10 -05:00
|
|
|
def daemon_main(tooter: Tooter):
|
2022-11-25 14:03:24 -05:00
|
|
|
"""Run from a command line."""
|
|
|
|
|
2022-11-25 16:06:03 -05:00
|
|
|
while True:
|
2022-11-25 14:03:24 -05:00
|
|
|
# do a thing
|
|
|
|
time.sleep(600)
|
|
|
|
|
2022-11-25 19:11:10 -05:00
|
|
|
def once(tooter: Tooter):
|
2022-11-25 14:03:24 -05:00
|
|
|
"""Run from a command line."""
|
2022-11-25 19:58:19 -05:00
|
|
|
# message = check_public_timeline(tooter)
|
|
|
|
message = f'{tooter.name} says hi!'
|
2022-11-26 00:19:52 -05:00
|
|
|
tooter.toot(
|
2022-11-26 00:19:40 -05:00
|
|
|
# random sentence...
|
2022-11-26 00:09:23 -05:00
|
|
|
random.choice(
|
2022-11-26 00:19:40 -05:00
|
|
|
# from a random line...
|
2022-11-26 00:09:23 -05:00
|
|
|
random.choice(
|
2022-11-26 00:19:40 -05:00
|
|
|
# from a random file...
|
2022-11-26 00:09:23 -05:00
|
|
|
random.choice(
|
|
|
|
list(tooter.files.values())
|
|
|
|
)
|
|
|
|
)
|
2022-11-26 00:19:40 -05:00
|
|
|
# stripped and split on full stops...
|
2022-11-26 00:09:23 -05:00
|
|
|
.strip().split('.')
|
|
|
|
)
|
2022-11-26 00:19:40 -05:00
|
|
|
# and of weird punctuation
|
2022-11-26 00:09:23 -05:00
|
|
|
.strip()
|
|
|
|
)
|
2022-11-25 14:03:24 -05:00
|
|
|
return 0
|
|
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(
|
2022-11-25 19:33:45 -05:00
|
|
|
description='Randomly interact with a Mastodon timeline.')
|
2022-11-25 14:03:24 -05:00
|
|
|
parser.add_argument( '-d', '--debug', action='store_true',
|
|
|
|
help='Enable debugging messages.')
|
|
|
|
parser.add_argument( '-o', '--once', action='store_true',
|
|
|
|
help='Run once and exit. Default is to run as a daemon.')
|
2022-11-26 00:09:23 -05:00
|
|
|
parser.add_argument( '-D', '--directory', default='text',
|
|
|
|
help='Change directory to source files from.')
|
|
|
|
parser.add_argument( 'file', type=argparse.FileType('r'),
|
2022-11-25 19:33:45 -05:00
|
|
|
help='Change file to source users from.')
|
2022-11-25 14:03:24 -05:00
|
|
|
args = parser.parse_args()
|
|
|
|
|
2022-11-26 00:09:23 -05:00
|
|
|
p = Path(args.directory)
|
|
|
|
if not p.exists():
|
|
|
|
print(f'{sys.argv[0]}: {args.directory}: No such file or directory', file=sys.stderr)
|
|
|
|
return 2
|
|
|
|
if not p.is_dir():
|
|
|
|
print(f'{sys.argv[0]}: {args.directory}: Is not a directory', file=sys.stderr)
|
|
|
|
return 2
|
|
|
|
|
|
|
|
Tooter.load_src_files(p)
|
|
|
|
|
2022-11-25 19:33:45 -05:00
|
|
|
Tooter.load_credentials(args.file)
|
2022-11-25 16:06:03 -05:00
|
|
|
|
|
|
|
if args.once:
|
2022-11-25 19:58:19 -05:00
|
|
|
for name in Tooter.credentials:
|
|
|
|
t = Tooter(name)
|
|
|
|
once(t)
|
2022-11-25 19:11:10 -05:00
|
|
|
return 0
|
2022-11-25 16:06:03 -05:00
|
|
|
|
2022-11-25 19:11:10 -05:00
|
|
|
daemon_main(t)
|
2022-11-25 14:03:24 -05:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2022-11-25 16:06:03 -05:00
|
|
|
sys.exit(main())
|