# -*- coding: utf-8 -*- # imports from StringIO import StringIO from distutils.spawn import find_executable from mimetypes import guess_type from zipfile import ZipFile from threading import Lock import json import logging import os import re import shutil import stat import subprocess import fsmonitor import vispa try: import Image import ImageFilter HAVE_PIL = True except: HAVE_PIL = False if not HAVE_PIL: convert_executable = find_executable('convert') if convert_executable and os.path.isfile(convert_executable): HAVE_CONVERT = True else: HAVE_CONVERT = False logger = logging.getLogger(__name__) class FileSystem(object): FILE_EXTENSIONS = ["png", "jpg", "jpeg", "bmp", "ps", "eps", "pdf", "txt", "xml", "py", "c", "cpp", "root", "pxlio"] BROWSER_EXTENSIONS = ["png", "jpg", "jpeg", "bmp"] ADDITIONAL_MIMES = { "pxlio": "text/plain", "root": "text/plain" } def __init__(self): # allowed extensions self.allowed_extensions = FileSystem.FILE_EXTENSIONS self._monitor_thread = fsmonitor.FSMonitorThread(self._monitor_callback) self._monitor_watches = {} self._monitor_listener = {} self._monitor_lock = Lock() def __del__(self): self.close() def setup(self, basedir=None): if basedir is None: basedir = self.expand('~/') if not os.path.isdir(basedir): raise Exception("Basedir '%s' does not exist!" % basedir) # the basedir self.basedir = os.path.join(basedir, ".vispa") if os.path.isdir(self.basedir): return "Basedir already exists" else: os.makedirs(self.basedir, 0o700) return "Basedir now exists" def close(self): if self._monitor_thread: self._monitor_thread.remove_all_watches() self._monitor_thread.running = False def get_mime_type(self, filepath): filepath = self.expand(filepath) mime, encoding = guess_type(filepath) if mime is not None: return mime ext = filepath.split(".")[-1] if ext is not None and ext != "" and ext.lower( ) in FileSystem.ADDITIONAL_MIMES.keys(): return FileSystem.ADDITIONAL_MIMES[ext] return None def check_file_extension(self, path, extensions=[]): path = self.expand(path) if (len(extensions) == 0): return True for elem in extensions: elem = elem if elem.startswith(".") else "." + elem if path.lower().endswith(elem.lower()): return True return False def exists(self, path, type=None): # type may be 'f' or 'd' path = self.expand(path) # path exists physically? if not os.path.exists(path): return None # type correct? target_type = 'd' if os.path.isdir(path) else 'f' if not type: return target_type type = type.lower() if type not in ['f', 'd']: return None return target_type if target_type == type else None def get_file_count(self, path, window_id=None, view_id=None, watch_id=None): # inline watch if window_id and view_id and watch_id: if self.watch(path, window_id, view_id, watch_id): pass # return -2 # don't fail atm since it would emit the wrong error message # actual function path = self.expand(path) if os.path.exists(path): return len(os.listdir(path)) else: return -1 def get_file_list(self, path, deep=False, filter=None, reverse=False, hide_hidden=True, encode_json=True, window_id=None, view_id=None, watch_id=None): # inline watch if window_id and view_id and watch_id: if self.watch(path, window_id, view_id, watch_id): pass # return "" # don't fail atm since it would not be caught on the client side # actual function filter = map(re.compile, filter) if filter else [] filelist = [] path_expand = self.expand(path) try: for elem in os.listdir(path_expand): # hide hidden files? if elem.startswith('.') and hide_hidden: continue # excluded by filters? match = False for filter_elem in filter: if filter_elem.search(elem): match = True if match != reverse: continue info = { 'name': elem } fullpath = os.path.join(path_expand, elem) stats = os.lstat(fullpath) is_symlink = stat.S_ISLNK(stats.st_mode) if is_symlink: realfullpath = os.path.realpath(fullpath) info.update({'symlink': True, 'realpath': realfullpath}) if os.path.exists(realfullpath): stats = os.stat(realfullpath) info.update({ 'size': stats.st_size, 'mtime': stats.st_mtime, 'type': 'd' if stat.S_ISDIR(stats.st_mode) else 'f' }) filelist.append(info) if deep: filelist.extend(self.get_file_list(fullpath, deep, filter, reverse)) except: pass # Determine the parent # parentpath = path_expand[:-1] if path_expand.endswith(os.sep) and # path_expand != os.sep else path_expand parentpath = os.path.dirname(path_expand) data = {'filelist': filelist, 'parentpath': parentpath} if encode_json: return json.dumps(data) else: return data def get_suggestions( self, path, length=1, append_hidden=True, encode_json=True): suggestions = [] source, filter = None, None # does the path exist? path_expanded = self.expand(path) if os.path.exists(path_expanded): # dir case if os.path.isdir(path_expanded): if path.endswith('/'): source = path else: suggestions.append(path + os.sep) return suggestions # file case else: return suggestions else: # try to estimate source and filter head, tail = os.path.split(path) if os.path.isdir(os.path.expanduser(os.path.expandvars(head))): source = head filter = tail # return empty suggestions when source is not set if not source: return suggestions files = os.listdir(os.path.expanduser(os.path.expandvars(source))) # resort? if append_hidden: files = sorted( map(lambda f: str(f), files), cmp=file_compare, key=str.lower) while (len(suggestions) < length or length == 0) and len(files): file = files.pop(0) if filter and not file.startswith(filter): continue suggestion = os.path.join(source, file) if not suggestion.endswith( '/') and os.path.isdir(os.path.expanduser(os.path.expandvars(suggestion))): suggestion += '/' suggestions.append(suggestion) return suggestions if not encode_json else json.dumps(suggestions) def cut_slashs(self, path): path = self.expand(path) path = path[1:] if path.startswith(os.sep) else path if path == "": return path path = path[:-1] if path.endswith(os.sep) else path return path def create_folder(self, path, name): path = self.expand(path) name = self.expand(name) # folder with the same name existent? name = self.handle_file_name_collision(name, path) fullpath = os.path.join(path, name) try: os.mkdir(fullpath) except Exception as e: # raise Exception("You don't have the permission to create this folder!") raise Exception(str(e)) def create_file(self, path, name): path = self.expand(path) name = self.expand(name) # file with the same name existent? name = self.handle_file_name_collision(name, path) fullpath = os.path.join(path, name) try: f = file(fullpath, "w") f.close() except Exception as e: raise Exception(str(e)) def rename(self, path, name, new_name, force=False): path = self.expand(path) name = self.expand(name) new_name = self.expand(new_name) try: if force == False: new_name = self.handle_file_name_collision(new_name, path) name = os.path.join(path, name) new_name = os.path.join(path, new_name) os.renames(name, new_name) return except Exception as e: return str(e) def remove(self, path): if isinstance(path, list): for p in path: self.remove(p) else: path = self.expand(path) if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) def compress(self, path, paths, name): # paths has to be a list of strings paths = paths if isinstance(paths, (list, tuple)) else [paths] paths = [self.expand(p) for p in paths] path = path if not path.endswith(os.sep) else path[:-1] path = self.expand(path) name = name if name.endswith(".zip") else name + ".zip" name = self.handle_file_name_collision(name, path) fullpath = os.path.join(path, name) with ZipFile(fullpath, "w") as archive: i = 0 while i < len(paths): if not paths[i]: i += 1 continue p = self.expand(paths[i]) i += 1 if os.path.isdir(p): for elem in os.listdir(p): fullp = os.path.join(p, elem) if os.path.isdir(fullp): paths.append(fullp) else: ap = fullp[ len(path):] if fullp.startswith(path) else fullp logger.debug(fullp) archive.write(fullp, ap) else: ap = p[len(path):] if p.startswith(path) else p logger.debug(p) archive.write(p, ap) def paste(self, path, fullsrc, cut): # TODO path = self.expand(path) if isinstance(fullsrc, (list, tuple)): for p in fullsrc: p = self.expand(p) self.paste(path, p, cut) return True # fulltarget = os.path.join(path, fullsrc.split(os.sep)[-1]) logger.error("") target = self.handle_file_name_collision( fullsrc.split(os.sep)[-1], path) fulltarget = os.path.join(path, target) if os.path.isdir(fullsrc): shutil.copytree(fullsrc, fulltarget) if cut: shutil.rmtree(fullsrc) else: shutil.copy2(fullsrc, fulltarget) if cut: os.remove(fullsrc) def save_file(self, path, content, binary=False, force=True): logger.info(path) logger.info("\n\n") return path = self.expand(path) # check if file already exists if os.path.exists(path) and not force: return False if binary: out = file(path, "wb") else: out = file(path, "w") logger.info("saved") logger.info("\n\n") out.write(content) out.close() return True def save_file_content(self, filename, content, path=None, force=True, append=False): # check write permissions # if not self.checkPermission(username, vPath, 'w'): # return False, "Permission denied!" if path: filename = os.path.join(path, filename) filename = self.expand(filename) # check if file already exists if os.path.exists(filename) and not force: return False, "The file '%s' already exists!" % filename out = open(filename, "ab+" if append else "wb") out.write(content) out.close() return True, "File saved!" def get_file_content(self, path): path = self.expand(path) f = open(path, "rb") content = f.read() f.close() return content def get_mtime(self, path): path = self.expand(path) return os.path.getmtime(path) def is_browser_file(self, path): path = self.expand(path) extension = path.split(".")[-1] return extension.lower() in FileSystem.BROWSER_EXTENSIONS def handle_file_name_collision(self, name, path): # collision? files = os.listdir(path) if name not in files: return name # when this line is reached, there is a collision! # cut the file extension extension = name.split(".")[-1] prename = None if name == extension: extension = "" prename = name else: prename = name.split("." + extension)[0] # has the name already a counter at its end? hasCounter = False preprename = None counter = prename.split("_")[-1] if counter != prename: try: counter = int(counter) hasCounter = True preprename = "_".join(prename.split("_")[:-1]) except: pass if hasCounter: # increment and try again counter += 1 newname = "%s_%d%s" % (preprename, counter, "" if extension == "" else "." + extension) else: newname = "%s_1%s" % ( prename, "" if extension == "" else "." + extension) # return return self.handle_file_name_collision(newname, path) def expand(self, path): return os.path.expanduser(os.path.expandvars(path)) def thumbnail(self, path, width=100, height=100, sharpen=True): path = self.expand(path) if HAVE_PIL: output = StringIO() img = Image.open(path) img.thumbnail((width, height), Image.ANTIALIAS) if sharpen: img.filter(ImageFilter.SHARPEN) img.save(output, "JPEG") contents = output.getvalue() output.close() return contents elif HAVE_CONVERT: cmd = ['convert', path, '-thumbnail', '%dx%d' % (width, height), 'jpeg:-'] p = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE) return p.communicate()[0] else: return self.get_file_content(path) def _monitor_callback(self, event): if event.action_name == 'access': return with self._monitor_lock: # inform all listener for combined_id in event.watch.listener: vispa.remote.send_topic( "extension.%s.socket.watch" % combined_id[1], window_id=combined_id[0], data={ "path": event.watch.path, "action_name": event.action_name, "watch_id": combined_id[2] }) # cleanup yourself if event.action_name == 'delete self': for combined_id in event.watch.listener: del self._monitor_listener[combined_id] self._check_watch(event.watch) def _check_watch(self, watch): if len(watch.listener) == 0: del self._monitor_watches[watch.path] self._monitor_thread.remove_watch(watch) def watch(self, path, window_id, view_id, watch_id): # fail if there is no such fie path = self.expand(path) if not os.path.exists(path): return "The file does not exist" combined_id = (window_id, view_id, watch_id) with self._monitor_lock: # watch is in use if combined_id in self._monitor_listener: # has path changed? if self._monitor_listener[combined_id].path == path: return "" else: self._monitor_listener[combined_id].listener.remove(combined_id) self._check_watch(self._monitor_listener[combined_id]) del self._monitor_listener[combined_id] # create or get the desired watch if path not in self._monitor_watches: if os.path.isfile(path): watch = self._monitor_thread.add_file_watch(path) elif os.path.isdir(path): watch = self._monitor_thread.add_dir_watch(path) else: return "This kind of file can't be wachted" watch.listener = [] self._monitor_watches[path] = watch else: watch = self._monitor_watches[path] if combined_id in watch.listener: return "This file is already watched" # this should never occur watch.listener.append(combined_id) self._monitor_listener[combined_id] = watch return "" def unwatch(self, window_id, view_id, watch_id=None): with self._monitor_lock: if watch_id is None: # will perform poor with lots of watches/listener for path, watch in self._monitor_watches.items(): for combined_id in watch.listener: if window_id==combined_id[0] and view_id==combined_id[1]: watch.listener.remove(combined_id) del self._monitor_listener[combined_id] self._check_watch(watch) else: combined_id = (window_id, view_id, watch_id) if combined_id in self._monitor_listener: self._monitor_listener[combined_id].listener.remove(combined_id) self._check_watch(self._monitor_listener[combined_id]) del self._monitor_listener[combined_id] return "" def string_compare(a, b): if a == b: return 0 elif a > b: return 1 else: return -1 def file_compare(a, b): if not a.startswith('.') and not b.startswith('.'): return string_compare(a, b) elif a.startswith('.') and b.startswith('.'): return string_compare(a, b) elif a.startswith('.') and not b.startswith('.'): return 1 elif not a.startswith('.') and b.startswith('.'): return -1