Commit f257473c authored by murban's avatar murban
Browse files

Renamed vfs to filesystem and small changes

parent e0ee0cfc
# @package FileSystem
# Imports
import os
from datetime import datetime
from distutils import dir_util
import locale
import shutil
import random
import cStringIO
from mimetypes import guess_type
import shutil
from zipfile import ZipFile
import xmlrpclib
class FileSystem(object):
FILE_EXTENSIONS = ["png", "jpg", "jpeg", "bmp", "ps", "eps", "pdf", "txt", "xml", "py", "c", "cpp", "root", "pxlio"]
BROWSER_EXTENSIONS = ["png", "jpg", "jpeg", "bmp", "pdf"]
ADDITIONAL_MIMES = {"pxlio": "text/plain",
"root" : "text/plain"}
def __init__(self):
# allowed extensions
self.allowed_extensions = FileSystem.FILE_EXTENSIONS
def setup(self, basedir, username):
# the data path
self.data_path = os.path.join(basedir, "data")
# the main paths
self.user_path = os.path.join(self.data_path, "user")
self.public_path = os.path.join(self.data_path, "public")
self.shared_path = os.path.join(self.data_path, "shared")
self.examples_path = os.path.join(self.data_path, "examples")
self.exercises_path = os.path.join(self.data_path, "exercises")
paths = [self.data_path, self.user_path, self.public_path, self.shared_path, self.examples_path, self.exercises_path]
for path in paths:
if not os.path.isdir(path):
os.makedirs(path, 0700)
if username is None or username == "":
return
upath = os.path.join(self.user_path, username)
if not os.path.isdir(upath):
os.makedirs(upath, 0700)
upath = os.path.join(self.public_path, username)
if not os.path.isdir(upath):
os.makedirs(upath, 0700)
def getMimeType(self, filepath):
mime, encoding = guess_type(filepath)
if mime is not None:
return mime
ext = filepath.split(".")[-1]
if ext is not None and ext != "" and ext.lower() in FileSystem.ADDITIONAL_MIMES.keys():
return FileSystem.ADDITIONAL_MIMES[ext]
return None
def checkFileExtension(self, path, extensions=[]):
if (len(extensions) == 0):
return True
for elem in extensions:
elem = elem if elem.startswith(".") else "." + elem
if path.lower().endswith(elem.lower()):
return True
return False
def exists(self, username, path, pathtype=None, fileextension=None, permissionmode="r"):
if isinstance(permissionmode, str) and permissionmode.lower() not in ["r", "w"]:
return False
# path exists physically?
if not os.path.exists(path):
return False
# pathtype correct? (file, folder, or None)
if pathtype is not None:
pathtype = pathtype.lower()
if not pathtype in ["file", "folder"]:
return False
if os.path.isdir(path) and pathtype == "file":
return False
elif not os.path.isdir(path) and pathtype == "folder":
return False
# file extension correct?
if fileextension is not None:
fileextension = fileextension if isinstance(fileextension, list) else [fileextension]
hit = False
for fileext in fileextension:
fileext = fileext if fileext.startswith(".") else "." + fileext
if path.endswith(fileext):
hit = True
if not hit:
return False
# permission?
if permissionmode is not None:
return False
return True
def getFileList(self, username, path, deep, ext_filter=[], reverse=False):
dirlist = os.listdir(path)
returnlist = []
for elem in dirlist:
fullPath = os.path.join(path, elem)
locale.setlocale(locale.LC_ALL, '')
stats = os.stat(path + ('' if path.endswith('/') else '/') + elem)
size = stats.st_size
size = locale.format("%d", size, grouping=True)
mtime = stats.st_mtime
mtime = datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M:%S")
if os.path.isdir(fullPath):
if elem.startswith("."):
continue
returnlist.append({'name': elem, 'type': 'folder', 'parent': path + '/', 'extension': '', 'mtime': mtime, 'size': size, "path": os.path.join(path, elem)})
if deep:
returnlist.extend(self.getFileList(username, fullPath, deep, ext_filter, reverse))
else:
if elem.startswith("."):
continue
extension = elem.split(".")[-1]
returnlist.append({'name': elem, 'type': 'file', 'parent': path + '/', 'extension': extension, 'mtime': mtime, 'size': size, "path": os.path.join(path, elem)})
# apply filters
filelist = []
# delete "." in file extensions
ext_filter = map(lambda ext: ext if not ext.startswith(".") else ext[1:], ext_filter)
for elem in returnlist:
if (elem["type"] == "folder"):
filelist.append(elem)
continue
if not reverse:
if str(elem["extension"]) not in ext_filter:
filelist.append(elem)
else:
if str(elem["extension"]) in ext_filter:
filelist.append(elem)
return filelist
def cutSlashs(self, path):
path = path[1:] if path.startswith("/") else path
if path == "":
return path
path = path[:-1] if path.endswith("/") else path
return path
def createFolder(self, path, name):
# folder with the same name existent?
fullpath = os.path.join(path, name)
if os.path.isdir(fullpath):
raise Exception("Name already in use!")
try:
os.mkdir(fullpath)
except Exception as e:
#raise Exception("You don't have the permission to create this folder!")
raise Exception(str(e))
def createFile(self, path, name):
# file with the same name existent?
fullpath = os.path.join(path, name)
if os.path.exists(fullpath):
raise Exception("Name already in use!")
try:
f = file(fullpath, "w")
f.close()
except Exception as e:
raise Exception(str(e))
def renameFolder(self, path, name):
# file or folder
if not os.path.isdir(path):
raise Exception("Renaming file with folder function!")
# folder with the same name existent?
path = path if not path.endswith("/") else path[:-1]
fullpath = os.path.join("/".join(path.split("/")[:-1]), name)
if os.path.exists(fullpath):
raise Exception("Name already in use!")
os.renames(path, fullpath)
def renameFile(self, path, name):
# file or folder
if os.path.isdir(path):
raise Exception("Renaming folder with file function!")
# file with the same name existent?
fullpath = os.path.join("/".join(path.split("/")[:-1]), name)
if os.path.exists(fullpath):
raise Exception("Name already in use!")
os.renames(path, fullpath)
def remove(self, vpath):
if isinstance(vpath, list):
for p in path:
self.remove(p)
return True
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
def compress(self, path, paths, name):
# paths has to be a list of strings
paths = paths if isinstance(paths, list) else [paths]
path = path if not path.endswith("/") else path[:-1]
fullpath = os.path.join(path, "%s.zip" % name)
if os.path.exists(fullpath):
raise Exception("Name already in use!")
archive = ZipFile(fullpath, "w")
vpath = vpath[1:] if vpath.startswith("/") else vpath
for p in paths:
if p is None or p == "":
continue
p = p[1:] if vp.startswith("/") else p
if os.path.isdir(p):
for elem in os.listdir(p):
fullp = os.path.join(p, elem)
if os.path.isdir(fullp):
paths.append(fullp)
else:
ap = fullp[len(path):] if fullp.startswith(path) else fullp
archive.write(fullpp, ap)
ap = p[len(path):] if p.startswith(path) else p
archive.write(p, ap)
archive.close()
def paste(self, vpath, vtarget, cut):
if isinstance(vtarget, list):
for vp in vtarget:
self.paste(username, vpath, vp, cut)
return True
# permission?
if not self.checkPermission(username, vpath, 'w'):
raise Exception("You don't have the permission in this folder!")
ppath = self.translateVirtualPath(username, vpath)
ptarget = self.translateVirtualPath(username, vtarget)
fulltarget = os.path.join(ppath, ptarget.split("/")[-1])
if os.path.exists(fulltarget):
raise Exception("Name already in use!")
if os.path.isdir(ptarget):
shutil.copytree(ptarget, fulltarget)
if cut:
shutil.rmtree(ptarget)
else:
shutil.copy2(ptarget, ppath)
if cut:
os.remove(ptarget)
def saveFileContent(self, username, vPath, content, force=True):
#check write permissions
if not self.checkPermission(username, vPath, 'w'):
return False, "Permission denied!"
pPath = self.translateVirtualPath(username, vPath)
#check if file already exists
if os.path.exists(pPath) and not force:
return False, "The file '%s' already exists!" % vPath
out = open(pPath, "w")
for line in content.data:
out.write(line)
out.close()
return True, "File saved!"
def getFileContent(self, username, vPath):
# permissions?
if not self.checkPermission(username, vPath, 'r'):
return False, "Permission denied!"
pPath = self.translateVirtualPath(username, vPath)
f = open(pPath, "r")
content = f.read()
f.close()
content = " " if content == "" else content
return xmlrpclib.Binary(content), "File opened!"
def isbrowserfile(self, path):
extension = path.split(".")[-1]
return extension in VirtualFileSystem.BROWSER_EXTENSIONS
def handleFileNameCollision(self, name, pPath):
# collision?
files = os.listdir(pPath)
if name not in files:
return name
# when this line is reached, there is a collision!
# cut the file extension
extension = name.split(".")[-1]
prename = None
if name == extension:
extension = ""
prename = name
else:
prename = name.split("." + extension)[0]
# has the name already a counter at its end?
hasCounter = False
preprename = None
counter = prename.split("_")[-1]
if counter != prename:
try:
counter = int(counter)
hasCounter = True
preprename = "_".join(prename.split("_")[:-1])
except:
pass
if hasCounter:
# increment and try again
counter += 1
newname = "%s_%d%s" % (preprename, counter, "" if extension == "" else "." + extension)
else:
newname = "%s_1%s" % (prename, "" if extension == "" else "." + extension)
# return
return self.handleFileNameCollision(newname, pPath)
"""
def handleDownload(self, username, vPath):
if not self.exists(username, vPath, pathtype="file"):
raise Exception("The file '%s' does not exist or<br />you don't have the permission to read this file."%vPath)
return None
pPath = self.translateVirtualPath(username, vPath)
payload = open(pPath, "r")
output = cStringIO.StringIO()
for line in payload:
output.write(line)
output.seek(0)
data = output.read()
output.close()
# get the content type depending on the file extension
ext = vPath.split(".")[-1]
mimetype = self.getMimeType(vPath)
if mimetype is None:
raise Exception("The file extension '%s' is not supported by this server"%ext)
return None
return data, mimetype, self.isbrowserfile(vPath)
"""
##
# Creates a unique ID
# p = 1e-18
def uniqueId(l=10):
chars = "abcdefghijklmnopqrstuvwxyz"
chars += "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
chars += "0123456789"
code = ""
for i in range(l):
rnd = int(round(random.uniform(0, len(chars) - 1)))
code += chars[rnd]
return code
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment