|
|
- import json
- import re
- import pathlib
- from warnings import warn
-
- import numpy as np
- import pandas as pd
- from tqdm import tqdm
-
- from moodle_dl.moodle_connector.request_helper import RequestHelper
- from moodle_dl.moodle_connector.first_contact_handler import FirstContactHandler
- from moodle_dl.moodle_connector.sso_token_receiver import extract_token
-
-
-
- def get_cds(c):
- regex = '^B\d+ \((B\d+)\) -.*'
- m = re.match(regex, c.fullname)
- if m:
- return m.groups(0)[0]
-
- def normalize_name(email, namestring):
- mailuser, domain = email.split('@')
- allnames = namestring.lower().split(' ')
-
- if domain == 'stud.unifi.it':
- firstname, lastname = mailuser.split('.')
- firstname = " ".join([ n.capitalize() for n in allnames if n in firstname ])
- lastname = " ".join([ n.capitalize() for n in allnames if n in lastname ])
- return firstname, lastname
-
- if len(allnames) == 2:
- firstname = namestring[1].capitalize()
- lastname = namestring[0].capitalize()
- return firstname, lastname
-
- return "", " ".join([a.capitalize() for a in allnames])
-
-
- def is_student(u):
- roleids = [r['roleid'] for r in u['roles']]
- return (5 in roleids and not u['email'].endswith('@unifi.it'))
-
- class MailListDownloader():
- def __init__(self, url):
-
- self.token, self.secret_token = extract_token(url)
- self.rh = RequestHelper('e-l.unifi.it', token=self.token)
- self.fch = FirstContactHandler(self.rh)
-
- def course_emails(self, courseid):
- users_raw = self.rh.post_REST('core_enrol_get_enrolled_users', dict(courseid=courseid))
- users = list()
-
- skipped = 0
-
- for u in users_raw:
- if not 'fullname' in u:
- print(f'Malformed record u={u}')
- continue
- if not 'email' in u:
- skipped += 1
- #print(f"Skipping {u['fullname']}, no available email.")
- continue
- if not is_student(u):
- skipped += 1
- #print(f"Skipping {u['fullname']}, not a student.")
- continue
-
- firstname, lastname = normalize_name(u['email'], u['fullname'])
- users.append(dict(firstname=firstname, lastname=lastname, email=u['email']))
- print(f"\t{len(users)} found, {skipped} skipped.")
- return users
-
- def course_list(self):
- userid, version = self.fch.fetch_userid_and_version()
- courses = self.fch.fetch_courses(userid)
- return [ dict(cid=c.id,cds=get_cds(c),fullname=c.fullname) for c in courses ]
-
- def download_from_new_courses(self, path):
- path = pathlib.Path(path)
- downloaded_file = path.with_suffix('.downloaded.json')
- database_file = path.with_suffix('.db.hdf')
-
- if downloaded_file.is_file():
- with downloaded_file.open() as f:
- downloaded = json.load(f)
- else:
- downloaded = []
-
- if database_file.is_file():
- db = pd.read_hdf(database_file, 'emails')
- else:
- db = pd.DataFrame()
- db['cds'] = []
- db['firstname'] = []
- db['lastname'] = []
- db['email'] = []
-
- for c in tqdm(self.course_list()):
- old_len = len(db)
- if c['cds'] and not c['cid'] in downloaded:
- print(f"Downloading {c['fullname']}")
- cid = c['cid']
- cds = c['cds']
- emails = self.course_emails(cid)
- emails = pd.DataFrame(emails)
- emails['cds'] = cds
- db = pd.concat([db, emails]).drop_duplicates().reset_index(drop=True)
- downloaded.append(cid)
- new_len = len(db)
- new = new_len - old_len
- duplicates = len(emails) - new
- print(f"\t{new_len-old_len} new, {duplicates} duplicates.")
-
- with downloaded_file.open('w') as f:
- json.dump(downloaded, f)
- db.to_hdf(database_file, key='emails')
- return db
-
|