import json
|
|
import re
|
|
import pathlib
|
|
from warnings import warn
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from tqdm import tqdm
|
|
|
|
from moodle_dl.moodle_connector.request_helper import RequestHelper
|
|
from moodle_dl.moodle_connector.first_contact_handler import FirstContactHandler
|
|
from moodle_dl.moodle_connector.sso_token_receiver import extract_token
|
|
|
|
|
|
|
|
def get_cds(c):
|
|
regex = '^B\d+ \((B\d+)\) -.*'
|
|
m = re.match(regex, c.fullname)
|
|
if m:
|
|
return m.groups(0)[0]
|
|
|
|
def normalize_name(email, namestring):
|
|
mailuser, domain = email.split('@')
|
|
allnames = namestring.lower().split(' ')
|
|
|
|
if domain == 'stud.unifi.it':
|
|
firstname, lastname = mailuser.split('.')
|
|
firstname = " ".join([ n.capitalize() for n in allnames if n in firstname ])
|
|
lastname = " ".join([ n.capitalize() for n in allnames if n in lastname ])
|
|
return firstname, lastname
|
|
|
|
if len(allnames) == 2:
|
|
firstname = namestring[1].capitalize()
|
|
lastname = namestring[0].capitalize()
|
|
return firstname, lastname
|
|
|
|
return "", " ".join([a.capitalize() for a in allnames])
|
|
|
|
|
|
def is_student(u):
|
|
roleids = [r['roleid'] for r in u['roles']]
|
|
return (5 in roleids and not u['email'].endswith('@unifi.it'))
|
|
|
|
class MailListDownloader():
|
|
def __init__(self, url):
|
|
|
|
self.token, self.secret_token = extract_token(url)
|
|
self.rh = RequestHelper('e-l.unifi.it', token=self.token)
|
|
self.fch = FirstContactHandler(self.rh)
|
|
|
|
def course_emails(self, courseid):
|
|
users_raw = self.rh.post_REST('core_enrol_get_enrolled_users', dict(courseid=courseid))
|
|
users = list()
|
|
|
|
skipped = 0
|
|
|
|
for u in users_raw:
|
|
if not 'fullname' in u:
|
|
print(f'Malformed record u={u}')
|
|
continue
|
|
if not 'email' in u:
|
|
skipped += 1
|
|
#print(f"Skipping {u['fullname']}, no available email.")
|
|
continue
|
|
if not is_student(u):
|
|
skipped += 1
|
|
#print(f"Skipping {u['fullname']}, not a student.")
|
|
continue
|
|
|
|
firstname, lastname = normalize_name(u['email'], u['fullname'])
|
|
users.append(dict(firstname=firstname, lastname=lastname, email=u['email']))
|
|
print(f"\t{len(users)} found, {skipped} skipped.")
|
|
return users
|
|
|
|
def course_list(self):
|
|
userid, version = self.fch.fetch_userid_and_version()
|
|
courses = self.fch.fetch_courses(userid)
|
|
return [ dict(cid=c.id,cds=get_cds(c),fullname=c.fullname) for c in courses ]
|
|
|
|
def download_from_new_courses(self, path):
|
|
path = pathlib.Path(path)
|
|
downloaded_file = path.with_suffix('.downloaded.json')
|
|
database_file = path.with_suffix('.db.hdf')
|
|
|
|
if downloaded_file.is_file():
|
|
with downloaded_file.open() as f:
|
|
downloaded = json.load(f)
|
|
else:
|
|
downloaded = []
|
|
|
|
if database_file.is_file():
|
|
db = pd.read_hdf(database_file, 'emails')
|
|
else:
|
|
db = pd.DataFrame()
|
|
db['cds'] = []
|
|
db['firstname'] = []
|
|
db['lastname'] = []
|
|
db['email'] = []
|
|
|
|
for c in tqdm(self.course_list()):
|
|
old_len = len(db)
|
|
if c['cds'] and not c['cid'] in downloaded:
|
|
print(f"Downloading {c['fullname']}")
|
|
cid = c['cid']
|
|
cds = c['cds']
|
|
emails = self.course_emails(cid)
|
|
emails = pd.DataFrame(emails)
|
|
emails['cds'] = cds
|
|
db = pd.concat([db, emails]).drop_duplicates().reset_index(drop=True)
|
|
downloaded.append(cid)
|
|
new_len = len(db)
|
|
new = new_len - old_len
|
|
duplicates = len(emails) - new
|
|
print(f"\t{new_len-old_len} new, {duplicates} duplicates.")
|
|
|
|
with downloaded_file.open('w') as f:
|
|
json.dump(downloaded, f)
|
|
db.to_hdf(database_file, key='emails')
|
|
return db
|
|
|