preparemoodleupload.py 9.79 KB
Newer Older
1
2
#!/usr/bin/env python

3
4
5
6
7
8
9
10
11
12
13
14
"""Prepares batch upload to Moodle's assignment module.

PDFs in folder 'in' are moved to a certain folder structure to be recognized
by moodle and finally zipped to 'outzip'.

Attention: Zip-archive 'outzip' will be overwritten in the following!
"""

__author__ = "Amrita Deb (deb@itc.rwth-aachen.de), " +\
    "Christian Rohlfing (rohlfing@ient.rwth-aachen.de)"


15
import os
16
import time  # keep track of time
17
import shutil  # copyfile, make_archive
18
import argparse  # argument parsing
19
import sys
Deb's avatar
Deb committed
20
import zipfile
21
22

import utils.moodle as moodle
23
import utils.matnum as matnum_utils
24

Deb's avatar
Deb committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

def zip_folders(base_folder, zip_file, size_limit):
    """Zip folders in base folder. If size limit is exceeded, create next zip
    file until all folders are zipped.

    Args:
        base_folder (str): path of base folder
        zip_file (str): path of zip file
        size_limit (int): size limit

    Returns:
        int: number of zip files
    """

    # Initialize
    zip_file_base = os.path.splitext(zip_file)[0]
    total_compress_size = 0
    zip_cnt = 0
    zf = None

    # Iterate over folders
    all_folders = os.listdir(base_folder)
    num_folders = len(all_folders)
    for cnt, folder in enumerate(all_folders):
        # Measure uncompressed folder path
        folder_path = os.path.join(base_folder, folder)
        folder_size = get_folder_size(folder_path)
        folder_size /= 1024**2  # conversion from bytes to MiB

        # If size_limit reached, create new zip file
        if total_compress_size + folder_size > size_limit or zf is None:
            # File name
            zip_cnt += 1
            if zf is not None:
                zf.close()  # Close previous zip file
                zip_file = "{zip}_{cnt}.zip".format(
                    zip=zip_file_base, cnt=zip_cnt)

            # Reset counters
            total_compress_size = 0
            file_cnt = 0

            # Open (new) zip file
            zf = zipfile.ZipFile(
                zip_file, mode='w', compression=zipfile.ZIP_DEFLATED)

        # Loop over files in current folder
        last_file_cnt = file_cnt
        for f in os.listdir(folder_path):
            # Add file to zip file
            zf.write(
                os.path.join(folder_path, f), arcname=os.path.join(folder, f))
            file_cnt += 1

        # Get compressed size of folder
        folder_compress_size = sum(
            [_.compress_size for _ in zf.filelist[last_file_cnt:]])
        folder_compress_size /= 1024**2  # conversion from bytes to MiB
        total_compress_size += folder_compress_size

        # Print for-loop progress
        if not (cnt % max(1, round(num_folders/10))):
            print(".", sep=' ', end='', flush=True)

    # Clean up
    zf.close()
    print("done.")

    return zip_cnt
Deb's avatar
Deb committed
94
95


Deb's avatar
Deb committed
96
97
98
99
100
101
102
103
def get_folder_size(path):
    """Get size in bytes of folder

    Args:
        path (str): path of folder

    Returns:
        int: number of bytes
Deb's avatar
Deb committed
104
    """
Deb's avatar
Deb committed
105
106
107
108
109
110
111
112
    total = 0
    for entry in os.scandir(path):
        if entry.is_file():
            size = entry.stat().st_size
        elif entry.is_dir():
            size = get_folder_size(entry.path)

        total += size
Deb's avatar
Deb committed
113

Deb's avatar
Deb committed
114
    return total
Deb's avatar
Deb committed
115

116

117
118
119
120
def sanity_check(matnums_csv, matnums_folder):
    """Check two cases for sanity:
    - Are there PDF files with no corresponding CSV entries?
    - Are there CSV entries with no provided PDF file?
121
122

    Args:
123
124
        matnums_csv (list): Matnums of all CSV entries
        matnums_folder (list): Matnums of all provided PDF files
125
126
    """

127
128
    # PDF files with no entry in CSV:
    notfoundcsv = list(set(matnums_folder).difference(matnums_csv))
129

130
131
    # Entries in CSV without PDF file
    notfoundpdf = list(set(matnums_csv).difference(matnums_folder))
132
133

    # Report back
134
135
136
137
138
139
140
    if len(notfoundcsv) > 0:
        print('''Warning: Following {} matnums have PDFs but no entry in CSV:
            {}'''.format(len(notfoundcsv), ", ".join(notfoundcsv)))

    if len(notfoundpdf) > 0:
        print('''Warning: Following {} matnums have CSV entries but no PDF:
            {}'''.format(len(notfoundpdf), ", ".join(notfoundpdf)))
141
142
143

    print("Done.\n")

144
145
    return notfoundcsv, notfoundpdf

146

147
148
149
def _make_parser():
    csv_parser = moodle.get_moodle_csv_parser()
    parser = argparse.ArgumentParser(
150
        parents=[csv_parser], prog='preparemoodleupload.py',
151
152
        description=__doc__,
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
153

Deb's avatar
Deb committed
154
155
156
    parser.add_argument(
        "infolder", help="Input folder with PDFs.")
    parser.add_argument(
157
158
159
        "csv", help="Moodle grading sheet.")
    parser.add_argument(
        "outzip", help="Zip archive with feedback files.")
160

161
162
163
    parser.add_argument(
        "-d", "--dry", action='store_true', help="Flag for dry run.")
    parser.add_argument(
164
        "-t", "--tmp", default="./tmp", help="Temporary folder.")
Deb's avatar
Deb committed
165
    parser.add_argument(
166
        "--nowarn", action='store_true', help="Disables warnings.")
Deb's avatar
Deb committed
167
    parser.add_argument(
168
169
170
171
172
173
174
        "--moodleuploadlimit", default="250",
        help="Moodle upload limit in MiB.")

    return parser


# Create argument parser with default values
Christian Rohlfing's avatar
Christian Rohlfing committed
175
_parser = _make_parser()
176

177

178
179
180
181
182
def main(args):
    """Main routine
    """

    # Parse input arguments
Christian Rohlfing's avatar
Christian Rohlfing committed
183
    args = _parser.parse_args(args)
184
    infolder = args.infolder
185
    sheet_csv = args.csv
186
    outzip = args.outzip
187
    tmp_folder = os.path.join(args.tmp, "to_be_zipped_for_moodle")
188
    dry = args.dry
189
190
191
192
    no_warn = args.nowarn
    csv_delim = args.csvdelim
    csv_quote = args.csvquote
    csv_enc = args.csvenc
193
    size_limit = int(args.moodleuploadlimit)  # Moodle upload size limit in MiB
194

Deb's avatar
Deb committed
195
196
197
198
199
200
     # Check folders
    zip_dir= outzip.rsplit('/',1)[0]
   
    if not os.path.exists(zip_dir):
        os.makedirs(zip_dir)

201
    # Print status
202
    starttime = time.time()
203
204
    num_students = moodle.get_student_number(sheet_csv=sheet_csv,
                                             csv_enc=csv_enc)
205
206

    print('''Preparing for moodle upload
Christian Rohlfing's avatar
Christian Rohlfing committed
207
Processing {} students'''.format(num_students))
208

209
    # Clean up and create temporary folder
Christian Rohlfing's avatar
Christian Rohlfing committed
210
    dryout = []
211
    if dry:
Christian Rohlfing's avatar
Christian Rohlfing committed
212
        print("Dry run")
213
214
215
216
217
218
    else:
        # Remove zip file
        if os.path.exists(outzip):
            os.remove(outzip)

        # Create temporary folder within given temporary directory
Deb's avatar
Deb committed
219
220
221
        if os.path.isdir(tmp_folder):
            shutil.rmtree(tmp_folder)
        os.mkdir(tmp_folder)
222

223
224
225
226
227
228
229
230
231
232
233
234
235
    # Parse input folder
    # Only PDF files are considered with first digits
    # containing matriculation number
    matnums_folder = []
    allfiles = os.listdir(infolder)
    allfiles.sort()
    allpdfs = []
    for f in allfiles:
        if f.lower().endswith('.pdf') and matnum_utils.starts_with_matnum(f):
            allpdfs.append(f)
            matnums_folder.append(matnum_utils.get_matnum(f))

    # Parse grading infos from CSV file
236
237
    infos = moodle.extract_info(sheet_csv=sheet_csv, csv_delim=csv_delim,
                                csv_quote=csv_quote, csv_enc=csv_enc)
238
239

    # Loop over grading infos
240
    num_found_pdfs = 0
241
242
    matnums_csv = []
    moodleids = []
Christian Rohlfing's avatar
Christian Rohlfing committed
243
    if no_warn:
244
        print("Start copying", sep=' ', end='', flush=True)
Christian Rohlfing's avatar
Christian Rohlfing committed
245
    else:
246
        print("Start copying")
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
    for cnt, info in enumerate(infos):
        # Copy PDF files
        # Find all PDFs starting with matriculation number, e.g.
        # '123456_Lastname_sheet.pdf' and '123456_Lastname_exam.pdf'
        # If pdf files for current student exists, create a directory and
        # copy the pdf files to it. The resulting directories can be
        # uploaded to Moodle
        matnum = info['matnum']
        matnums_csv.append(matnum)
        moodleid = info['moodleid']
        moodleids.append(moodleid)

        pdfs_student = [_ for _ in allpdfs
                        if matnum == matnum_utils.get_matnum(_)]
        if len(pdfs_student) > 0:  # Found at least one pdf
262
            num_found_pdfs += len(pdfs_student)
263
264
265

            # Prepare submission folder
            folder = moodle.submission_folder_name(info)
266
            longfolder = os.path.join(tmp_folder, folder)
Deb's avatar
Deb committed
267

268
269
270
271
272
273
274
275
            # Create folder
            if not dry:
                os.mkdir(longfolder)

            # Copy all files to folder
            for pdffile in pdfs_student:
                longpdffile = os.path.join(infolder, pdffile)
                longpdffiledest = os.path.join(longfolder, pdffile)
276
                if not dry:
277
278
                    shutil.copyfile(longpdffile, longpdffiledest)
                else:
Christian Rohlfing's avatar
Christian Rohlfing committed
279
280
                    dryout.append(
                        "- {old} -> {new}"
Deb's avatar
Deb committed
281
282
                        .format(
                            old=pdffile, new=os.path.join(folder, pdffile)))
283

284
        elif not no_warn:  # No PDF found
285
286
287
288
            print("Warning: PDF for {matnum} (id={id}, name={name}) not found."
                  .format(matnum=matnum, id=moodleid, name=info['fullname']))

        # Print for-loop progress
Christian Rohlfing's avatar
Christian Rohlfing committed
289
        if no_warn and not (cnt % max(1, round(num_students/10))):
290
            print(".", sep=' ', end='', flush=True)
291
292

    # Print results
Christian Rohlfing's avatar
Christian Rohlfing committed
293
    print("done.")
294
295
    print("Found {num_pdf} PDFs (CSV had {num_csv} entries)"
          .format(num_pdf=num_found_pdfs, num_csv=num_students))
296
297
298

    # Sanity check:
    # Check for PDFs not reflected in CSV (student not registered in Moodle)
299
    sanity_check(matnums_csv, matnums_folder)
300

301
    # Zip
302
    if not dry:
Deb's avatar
Deb committed
303
304
305
306
307
        print("Zipping")
        zip_cnt = zip_folders(
            base_folder=tmp_folder, zip_file=outzip, size_limit=size_limit)

        # Remove temporary folder
308
        shutil.rmtree(tmp_folder)
309

Deb's avatar
Deb committed
310
311
312
313
        # Print status
        print("{cnt} zip archives are stored ({zip}*)"
              .format(cnt=zip_cnt, zip=os.path.splitext(outzip)[0]))

314
    # Print dry run results
315
    else:
Christian Rohlfing's avatar
Christian Rohlfing committed
316
317
        dryout.sort()
        print("\nDry run results:\n{}".format("\n".join(dryout)))
318

319
    # Print status
320
    endtime = time.time()
321
    print("Time taken: {:.2f}".format(endtime-starttime))
322
323


324
# Main routine
325
if __name__ == '__main__':
326
    main(sys.argv[1:])