diff --git a/preparemoodle.py b/preparemoodle.py index 370f232353f010e5f3740e5b29fc856b47cfba59..a8d37f6d6229817119908ffa7a05f3ee89484bd5 100755 --- a/preparemoodle.py +++ b/preparemoodle.py @@ -1,185 +1,189 @@ #!/usr/bin/env python import csv -import os,time +import os +import time import shutil # copyfile, make_archive -import argparse, sys - - -def find_file(pattern, path): - if os.name == "posix": - import subprocess - - result = [line[2:] for line in subprocess.check_output( - "find " + path + " -type f -name " + pattern, - shell=True).splitlines()] - result = [tmp.decode("utf-8") for tmp in result] - - else: - import fnmatch +import argparse +import sys +import utils.matnum as utils + + +def find_unmatched_pdfs(infolder, matnums, nowarn): + """Finds matnumbers not present in CSV but in PDF folder + + Args: + infolder (str): path to input folder + matnums (list): list of matriculation numbers + nowarn (int): flag + """ + + print("\nSearching for matnumbers not present in CSV but in PDF folder:") + + # Loop over all PDFs: + notfoundmatnums = [] + for root, dirs, files in os.walk(infolder): + for pdffile in files: + if pdffile.endswith(".pdf"): + # Get matriculation number from file + matnum = utils.get_matnum(pdffile) + + # Search matriculation number in CSV + if matnum not in matnums: + notfoundmatnums.append(matnum) + if not nowarn: + print("Warning: {} not in CSV".format(matnum)) + + # Report back + if len(notfoundmatnums) > 0: + print('''Could not find following {} matnumbers in CSV: + {}'''.format(len(notfoundmatnums), ", ".join(notfoundmatnums))) + + print("Done.\n") + + +def main(args): + """Main routine + """ + + # Parse input arguments + parser = argparse.ArgumentParser(description=''' + prepares batch upload to Moodle via assignment module. + PDFs in folder 'in' are moved to folder 'tmp' with a certain folder structure and finally zipped to 'out'. + Attention: zip-archive 'out' will be overwritten in the following! + + ''') + parser.add_argument("-i", "--infolder", default="./pdfs_encrypted", + help="Input folder with PDFs. Default: ./pdfs_encrypted") + parser.add_argument("-c", "--csv", default="./Bewertungen.csv", + help="Moodle grading CSV file, needed to construct the folder names. Default: ./Bewertungen.csv") + parser.add_argument("-o", "--outzip", default="./moodle_feedbacks.zip", + help="Output zip archive. Default: ./moodle_feedbacks.zip") + parser.add_argument("-d", "--dry", action='store_true', + help="Flag for dry run, displays only the folder structure inside the archive moodle_feedbacks.zip") + parser.add_argument("-t", "--tmp", default="./tmp", + help="tmp folder. Default: ./tmp") + parser.add_argument("--nowarn", action='store_true', + help="Disables warnings") + + args = parser.parse_args(args) + infolder = args.infolder + csvfilename = args.csv + outzip = args.outzip + tmpfolder = os.path.join(args.tmp, "to_be_zipped_for_moodle") + dry = args.dry + nowarn = args.nowarn + + starttime = time.time() + + # Print status with total number of lines + numlines = 0 + with open(csvfilename, newline='') as csvfile: + numlines = sum(1 for line in csvfile) + + print('''Preparing for moodle upload +Processing {} lines + '''.format(numlines)) - result = [] - for root, _, files in os.walk(path): - for name in files: - if fnmatch.fnmatch(name, pattern): - result.append(os.path.join(root, name)) + dryout = "" + if dry: + print("Dry run\n") + else: + # Remove zip file + if os.path.exists(outzip): + os.remove(outzip) + + # Create temporary folder within given temporary directory + if not os.path.isdir(tmpfolder): + os.mkdir(tmpfolder) + + # Open CSV file + with open(csvfilename, newline='') as csvfile: + + numfoundpdfs = 0 + matnums = [] + line_cnt = 0 + print("Start iterating...", sep='', end='', flush=True) + + # Loop over all lines in CSV file + reader = csv.reader(csvfile, delimiter=',', quotechar='"') + next(reader) # skip header CSV line + for row in reader: + # Parse required fields from CSV line + # Moodle has its own internal ID per participant alongside + # matriculation number + moodleid = row[0] + moodleid = moodleid.replace("Teilnehmer/in", "") # German + moodleid = moodleid.replace("Participant ", "") # English + name = row[1] # Lastname, Firstname + matnum = row[2] # matriculation number (6-digit) + matnums.append(matnum) # save matriculation number for later + + # Copy PDF files + # Find all PDFs starting with matriculation number, e.g. + # '123456_Lastname_sheet.pdf' and '123456_Lastname_exam.pdf' + # If pdf files for current student exists, create a directory and + # copy the pdf files to it. The resulting directories can be + # uploaded to Moodle + longpdffiles = utils.find_file(matnum + "*.pdf", infolder) + if len(longpdffiles) > 0: # Found some file(s) + numfoundpdfs += 1 + + # Prepare folder + # For upload, Moodle accepts submission files per participant + folder = "{}_{}_assignsubmission_file_".format(name, moodleid) + longfolder = os.path.join(tmpfolder, folder) + + # Create folder + if not dry: + os.mkdir(longfolder) + + # Copy all files to folder + for longpdffile in longpdffiles: + pdffile = os.path.basename(longpdffile) + + if not dry: + shutil.copyfile(longpdffile, + os.path.join(longfolder, pdffile)) + else: + dryout += "\n{}".format(os.path.join(folder, pdffile)) + else: + if not nowarn: + print("Warning: PDF corresponding to matnumber {} (moodleid={}, name={}) not available.".format( + matnum, moodleid, name + )) + + # Print progress + if not (line_cnt % max(1, round(numlines/10))): + print(".", sep=' ', end='', flush=True) + line_cnt += 1 + + # Print results + print("Found {} PDFs (CSV had {} entries)".format(numfoundpdfs, numlines)) + print("done.") + + # Sanity check: + # Check for PDFs not reflected in CSV (student not registered in Moodle) + find_unmatched_pdfs(infolder, matnums, nowarn) + + # Zipping + if not dry: + # Zip + print("Zipping") + shutil.make_archive(os.path.splitext(outzip)[0], 'zip', tmpfolder) + print('The Zip archive is available at: '+outzip) + + # Delete temporary folder + shutil.rmtree(tmpfolder) + + else: + print("\nDry run results:\n{}".format(dryout)) + + endtime = time.time() + + print("""Done. +Time taken: {:.2f}""".format(endtime-starttime)) - return result if __name__ == '__main__': - - parser = argparse.ArgumentParser(description=''' - prepares batch upload to Moodle via assignment module. - PDFs in folder 'in' are moved to folder 'tmp' with a certain folder structure and finally zipped to 'out'. - Attention: zip-archive 'out' will be overwritten in the following! - - ''') - parser.add_argument("-i", "--infolder", default="./pdfs_encrypted", - help="Input folder with PDFs. Default: ./pdfs_encrypted") - parser.add_argument("-c", "--csv", default="./Bewertungen.csv", - help="Moodle grading CSV file, needed to construct the folder names. Default: ./Bewertungen.csv") - parser.add_argument("-o", "--outzip", default="./moodle_feedbacks.zip", - help="Output zip archive. Default: ./moodle_feedbacks.zip") - parser.add_argument("-d", "--dry", action='store_true', - help="Flag for dry run, displays only the folder structure inside the archive moodle_feedbacks.zip") - parser.add_argument("-t", "--tmp", default="./tmp", - help="tmp folder. Default: ./tmp") - parser.add_argument("--nowarn", action='store_true', - help="Disables warnings") - parser.add_argument("-b","--batch", default="0", - help="Check whether it runs through batch script or not. Default: 0") - - args = parser.parse_args() - infolder = args.infolder - csvfilename = args.csv - outzip = args.outzip - tmpfolder = args.tmp - dry = args.dry - nowarn = args.nowarn - batch_process = int(args.batch) - - numlines = 0 - starttime = time.time() - with open(csvfilename, newline='') as csvfile: - numlines = sum(1 for line in csvfile) - - print('''Preparing for moodle upload - Processing {} lines - '''.format(numlines)) - - if dry: - print("Dry run\n") - dryoutput="" - else: - if batch_process == 0: - for root, dirs, files in os.walk(tmpfolder): - for f in files: - os.unlink(os.path.join(root, f)) - for d in dirs: - shutil.rmtree(os.path.join(root, d)) - - - if os.path.exists(outzip): os.remove(outzip) - - - with open(csvfilename, newline='') as csvfile: - - # Loop over all lines in CSV file - numfoundpdfs = 0 - cnt = 0 - print("Start iterating...", sep='', end='', flush=True) - - reader = csv.reader(csvfile, delimiter=',', quotechar='"') - next(reader) # skip first row in CSV file since this should be the header - for row in reader: - # parse the required fields from the csv file - id = row[0] - id = id.replace("Teilnehmer/in", "") - id = id.replace("Participant ", "") - name = row[1] - matnum = row[2] - - # if a pdf file for current student exists, create a directory and copy - # the pdf file to it. The resulting directories can be uploaded to moodle - longpdffile = '' - paths = find_file(matnum + "*.pdf", infolder) - - if len(paths) > 0: - longpdffile = paths[0] - if len(paths) > 1: # TODO: implement second loop for enabling distribution of multiple files - raise Exception("More than one PDFs starting with matnum {} found!".format(matnum)) - - - if os.path.isfile(longpdffile): - numfoundpdfs += 1 - pdffile = os.path.basename(longpdffile) - folder = "{}_{}_assignsubmission_file_".format(name, id) - longfolder = os.path.join(tmpfolder, folder) - - if not dry: - os.mkdir(longfolder) - shutil.copyfile(longpdffile, os.path.join(longfolder, pdffile)) - else: - dryoutput += "\n{}".format(os.path.join(folder, pdffile)) - else: - if not nowarn: - print("Warning: PDF corresponding to matriculation number {} (id={}, name={}) not available.".format( - matnum, id, name - )) - - # Progress - if not (cnt % max(1,round(numlines/10))): - print(".", sep=' ', end='', flush=True) - cnt += 1 - - print("done.\n") - - print("Found {} PDFs (CSV had {} entries)\n".format(numfoundpdfs, numlines)) - - print("Searching for matriculation numbers not present in CSV but in PDF input folder:") - - # Check for PDFs which are not reflected in CSV (student not registered in Moodle) - numnotfoundmatnums = 0 - notfoundmatnums = "" - - for root, dirs, files in os.walk(infolder): - for pdffile in files: - if pdffile.endswith(".pdf"): - # Get matriculation number from file - matnum = pdffile[0:6] - - # Search in CSV - with open(csvfilename, 'r') as csvfile: - notfound = True - for line in csvfile: - if matnum in line: - notfound = False - - if notfound: - numnotfoundmatnums += 1 - notfoundmatnums += matnum + ", " - if not nowarn: - print("Warning: Could not find {} in CSV".format(matnum)) - - if numnotfoundmatnums > 0: - print('''I could not find the following {} matriculation numbers in CSV: - {}'''.format(numnotfoundmatnums, notfoundmatnums)) - - print("Done.") - - - # Zipping - if not dry: - print("Zipping") - - shutil.make_archive(os.path.splitext(outzip)[0], 'zip', tmpfolder) - - else: - print("\nResults from dry ryn:\n{}".format(dryoutput)) - - - print("\nDone.\n") - - endtime = time.time() - print('\n The Zip archive is available at: '+outzip) - print(f'\nTime taken: {endtime-starttime:.2f}s\n') \ No newline at end of file + main(sys.argv[1:]) diff --git a/supplements.py b/supplements.py new file mode 100755 index 0000000000000000000000000000000000000000..155606c482fb194af1270a613c06c0684d2d5cd7 --- /dev/null +++ b/supplements.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python + +"""Prepare supplement material + +Given a folder with exam scans, this script copies supplementary material (such +as exam or sample solution) to have the same prefix (e.g. +"[matnum]_[lastname]") as the exam scan to be ready for watermarking / moodle +upload. +""" + +import sys # get arguments from command line +import os # path listing/manipulation/... +import time # keep track of time +import argparse # handle command line arguments +import shutil # copy + +import utils.matnum as utils + + +def copy_supplements(supp_dir, output_dir, pdf_files, dry=False): + """Copy supplement files + + Args: + supp_dir (str): path to supplement folder + output_dir (str): path to output folder + pdf_files (list): list of pdf files + dry (bool): indicate dry run + """ + + dryout = [] + if dry: + print("Dry run\n") + + # Iterate over supplement files + supp_files = os.listdir(supp_dir) + cnt = 0 + num_files = len(supp_files)*len(pdf_files) + copied_files = [] + for supp_file in supp_files: + supp_filefull = os.path.join(supp_dir, supp_file) + supp_stem = os.path.splitext(supp_file)[0] # filename without .pdf + + # Iterate over scanned PDF files + for pdf_file in pdf_files: + prefix = os.path.splitext(pdf_file)[0] + new_file = prefix + "_" + supp_stem + ".pdf" + new_filefull = os.path.join(output_dir, new_file) + + # Copy + if not dry: + shutil.copyfile(supp_filefull, new_filefull) + else: + dryout.append(new_file) + copied_files.append(new_file) + + # Print progress + if not (cnt % max(1, round(num_files/10))): + print(".", sep=' ', end='', flush=True) + cnt += 1 + + # Display dry run results + if dry: + dryout.sort() + print("\nDry run results:\n{}".format("\n".join(dryout))) + + return copied_files + + +def main(args): + """Main function + + For all PDFs in ./pdfs folder: + 1) Convert each page of the PDFs into image + 2) Watermark each image + 3) Convert each image into single page PDFs + 4) Merge PDFs to one combined PDF + """ + + # Argument handling + parser = argparse.ArgumentParser(description=''' + PDFs of exam scans from folder 'in' are watermarked with the + matriculation number of the respective student. + Watermarked PDFs are stored in folder 'out' + ''') + parser.add_argument("-s", "--supplementfolder", default="./supplements", + help="Folder with supplements. Default: ./supplements") + parser.add_argument("-p", "--pdffolder", default="./pdfs", + help="PDF folder with scanned PDFs. Default: ./pdfs") + parser.add_argument("-o", "--outfolder", default="./supplements_out", + help="Output folder. Default: ./supplements_out") + parser.add_argument("-d", "--dry", action='store_false', + help="Flag for dry run") + + args = parser.parse_args(args) + supp_dir = args.supplementfolder + pdf_dir = args.pdffolder + output_dir = args.outfolder + dry = args.dry + + # Print status + starttime = time.time() + + pdf_folder = os.listdir(pdf_dir) + pdf_files = [_ for _ in pdf_folder + if _.endswith(".pdf") and utils.check_matnum(_[0:6])] + + copied_files = copy_supplements(supp_dir, output_dir, pdf_files, dry) + + # Print status + endtime = time.time() + print("""All PDFs are watermarked and can be found in {} folder: +Time taken: {:.2f}s + """.format(output_dir, endtime-starttime)) + + return copied_files + + +if __name__ == '__main__': + main(sys.argv[1:]) diff --git a/pdfs/123456_GDET3_20H.pdf b/supplements/GDET3_20H.pdf similarity index 100% rename from pdfs/123456_GDET3_20H.pdf rename to supplements/GDET3_20H.pdf diff --git a/pdfs/123456_GDET3_20H_loes.pdf b/supplements/GDET3_20H_loes.pdf similarity index 100% rename from pdfs/123456_GDET3_20H_loes.pdf rename to supplements/GDET3_20H_loes.pdf diff --git a/supplements_out/.gitkeep b/supplements_out/.gitkeep new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/test_supplements.py b/tests/test_supplements.py new file mode 100644 index 0000000000000000000000000000000000000000..e022c319d7afa3c49e359fd417b5e1fca1568279 --- /dev/null +++ b/tests/test_supplements.py @@ -0,0 +1,51 @@ +import unittest +import time +import os +import tempfile +import shutil + + +class MainTest(unittest.TestCase): + def setUp(self): + self.tic = time.time() # todo this is sooo ugly + + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + self.toc = time.time() + t = self.toc - self.tic + print('Time: %.3f' % (t)) + + def test_supplements_watermark(self): + import supplements + import watermark + import utils.matnum as utils + + expected_files = ['123456_Nachname_GDET3_20H_loes_w.pdf', '123456_Nachname_GDET3_20H_w.pdf', + '456789_Lastname_GDET3_20H_loes_w.pdf', '456789_Lastname_GDET3_20H_w.pdf'] + + # Prepare parameter + supp_dir = './supplements' + pdf_dir = './pdfs' + dpi = 250 + + supp_out_dir = os.path.join(self.test_dir, 'supplements_out') + os.mkdir(supp_out_dir) + + tmp_dir = os.path.join(self.test_dir, 'tmp') + os.mkdir(tmp_dir) + + out_dir = os.path.join(self.test_dir, 'out') + os.mkdir(out_dir) + + # Copy supplements file + supplements.main(["-s", supp_dir, "-p", pdf_dir, "-o", supp_out_dir]) + + # Watermark files + watermark.main(["-i", supp_out_dir, "-o", out_dir, + "-t", tmp_dir, "--dpi", str(dpi)]) + + # Assert output + created_files = os.listdir(out_dir) + created_files.sort() + self.assertEqual(expected_files, created_files) diff --git a/tests/test_watermark.py b/tests/test_watermark.py new file mode 100644 index 0000000000000000000000000000000000000000..e1f24bfe81606849bdb7c305f0f3df8d58d72b84 --- /dev/null +++ b/tests/test_watermark.py @@ -0,0 +1,57 @@ +import unittest +import time +import os +import tempfile +import shutil + + +class MainTest(unittest.TestCase): + def setUp(self): + self.tic = time.time() # todo this is sooo ugly + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + self.toc = time.time() + t = self.toc - self.tic + print('Time: %.3f' % (t)) + + # Clean up + shutil.rmtree(self.test_dir) + + def test_watermark_single_pdf(self): + import watermark + + # Prepare parameter + in_dir = './pdfs' + dpi = 250 + pdf_file = '123456_Nachname.pdf' + + tmp_dir = os.path.join(self.test_dir, 'tmp') + os.mkdir(tmp_dir) + + out_dir = os.path.join(self.test_dir, 'out') + os.mkdir(out_dir) + + # Call function + watermark.watermark_pdf(in_dir, tmp_dir, out_dir, dpi, pdf_file) + + self.assertTrue(os.listdir(out_dir)[0], '123456_Nachname_w.pdf') + + def test_watermark_pdfs(self): + import watermark + + # Prepare parameter + in_dir = './pdfs' + dpi = 250 + + tmp_dir = os.path.join(self.test_dir, 'tmp') + os.mkdir(tmp_dir) + + out_dir = os.path.join(self.test_dir, 'out') + os.mkdir(out_dir) + + # Watermark files + watermark.main(["-i", in_dir, "-o", out_dir, + "-t", tmp_dir, "--dpi", str(dpi)]) + + self.assertTrue(True) diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/utils/matnum.py b/utils/matnum.py new file mode 100644 index 0000000000000000000000000000000000000000..baca019a6c01b17af3c2acdf0cdfd934a439de44 --- /dev/null +++ b/utils/matnum.py @@ -0,0 +1,65 @@ +import os + + +def find_file(pattern, path): + """Finds file given pattern + + Args: + pattern (str): pattern + path (str): path to folder + + Returns: + list: list of filenames in folder matching pattern + """ + + if os.name == "posix": + import subprocess + + result = [line[2:] for line in subprocess.check_output( + "find " + path + " -type f -name " + pattern, + shell=True).splitlines()] + result = [tmp.decode("utf-8") for tmp in result] + + else: + import fnmatch + + result = [] + for root, _, files in os.walk(path): + for name in files: + if fnmatch.fnmatch(name, pattern): + result.append(os.path.join(root, name)) + + return result + + +def check_matnum(matnum): + """Checks for valid matriculation number + + Args: + matnum (str): matriculation number + + Returns: + bool: valid + """ + return len(matnum) == 6 and matnum.isdigit() + + +def get_matnum(s): + """Extracts matriculation number from string + + Args: + s (str): file name with first 6 characters matriculation number + + Returns: + str: 6-digit matriculation number + """ + + # Get matriculation number + # Has to be separated by "_" from the rest of the file name + matnum = s.split('_', 1)[0] + + # Sanity check + if not check_matnum(matnum): + raise ValueError("{} not a valid matriculation number".format(matnum)) + + return matnum diff --git a/watermark.py b/watermark.py index 60e1ef9cda24b6b28fea2bb75b0b549dbba9d805..bfd49d7cbdccc4b6acf7425cf51be49bd255c0a6 100755 --- a/watermark.py +++ b/watermark.py @@ -14,15 +14,12 @@ import time # keep track of time import argparse # handle command line arguments from multiprocessing import Pool # multi processing from functools import partial - -# TODO decide wand vs pdf2image from wand.image import Image as wi # PDF to images -import io # converting wand image to pillow image -from pdf2image import convert_from_path - from PIL import Image, ImageDraw, ImageFont # Image handling from PyPDF2 import PdfFileMerger, PdfFileReader # PDF handling +import utils.matnum as utils + def convert_pdf_to_img(pdf_file, input_dir, tmp_dir, dpi): """Converts all pages from a PDF to single images @@ -60,46 +57,6 @@ def convert_pdf_to_img(pdf_file, input_dir, tmp_dir, dpi): return img_files -def convert_pdf_to_img_new(pdf_file, input_dir, tmp_dir, dpi): - - # PDF path - pdf_path = os.path.join(input_dir, pdf_file) - - # Output path - img_base = os.path.splitext(pdf_file)[0] + '_' - convert_from_path(pdf_path, dpi=dpi, output_folder=tmp_dir, - fmt='png', output_file=img_base) - - # Iterate over pages and store them as image - img_files = os.listdir(tmp_dir) - img_files = [os.path.join(tmp_dir, _) - for _ in img_files if _.startswith(img_base)] - img_files.sort() - - return img_files - - -def get_matnum(s): - """Extracts matriculation number from string - - Args: - s (str): file name with first 6 characters matriculation number - - Returns: - str: 6-digit matriculation number - """ - - # Get matriculation number - # Has to be separated by "_" from the rest of the file name - matnum = s.split('_', 1)[0] - - # Sanity check - if len(matnum) != 6 or not matnum.isdigit(): - raise ValueError("{} not a valid matriculation number".format(matnum)) - - return matnum - - def create_watermark_template(img_file, matnum, dpi): """Creates transparent image with repeated matriculation number @@ -229,7 +186,7 @@ def watermark_pdf(input_dir, tmp_dir, output_dir, dpi, pdf_file): img_files = convert_pdf_to_img(pdf_file, input_dir, tmp_dir, dpi) # Extracting matriculation numebers - matnum = get_matnum(pdf_file) + matnum = utils.get_matnum(pdf_file) # Watermarking PDF page images # Create template for first page @@ -286,7 +243,8 @@ def main(args): # Print status starttime = time.time() pdf_folder = os.listdir(infolder) - pdf_files = [_ for _ in pdf_folder if _.endswith(".pdf")] + pdf_files = [_ for _ in pdf_folder + if _.endswith(".pdf") and utils.check_matnum(_[0:6])] print(""" Available PDFs to be watermarked: - {}