#! /usr/bin/env python """This is Tom's personal Python subroutine library. You are not expected to understand this. Warning: this file contains HIGH MAGIC and DEEP VOODOO, as well as quite a bit of QUESTIONABLE CODE and very few COMMENTS. Use with caution. I am trying to use a naming convention for generators, or other methods which return iterators where you might have expected a proper sequence, like that in itertools, ie calling things 'ipotatoes' instead of just 'potatoes'. I will attempt to keep an up-to-date version of this at or near . The current version's details are #{None}#. (c) 2006 Tom Anderson Redistribution and use in source and binary forms, with or without modification, are permitted. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ POSITIVE_INFINITY = 1e300000 # best current practice from PEP 754 NEGATIVE_INFINITY = -POSITIVE_INFINITY NaN = POSITIVE_INFINITY / POSITIVE_INFINITY def functor(m): return lambda k: m[k] def find(seq, thing, off=0, dummy=-1): try: return seq.index(thing, off) except ValueError: return dummy def findall(s, x): return list(ifindall(s, x)) def ifindall(s, x): if (hasattr(s, "find")): find = type(s).find # i TAUGHT the Wu-Tang this trick else: find = find idx = -1 while (True): idx = idx + 1 idx = find(s, x, idx) if (idx == -1): return else: yield idx def arguments(argv=None, expand=True): if (argv == None): import sys argv = sys.argv[1:] argv = list(argv) args = [] flags = {} while (len(argv) > 0): arg = argv.pop(0) if (arg == "--"): args.extend(argv) break elif (expand and arg.startswith("@")): if (len(arg) > 1): arg = arg[1:] else: arg = argv.pop(0) argv[0:0] = list(stripped(file(arg))) elif (arg.startswith("-") and (len(arg) > 1)): arg = arg[1:] split = min(find(arg, ":", 0, len(arg)), find(arg, "=", 0, len(arg))) if (split < len(arg)): key = arg[:split] value = arg[(split + 1):] else: key = arg value = "" flags[key] = value else: args.append(arg) return args, flags def split(l, sep): if (not callable(sep)): oldsep = sep sep = lambda x: x == oldsep parts = [] part = [] for i in l: if (sep(i)): if (len(part) > 0): parts.append(part) part = [] else: part.append(i) if (len(part) > 0): parts.append(part) return parts def unzip(l): m = map(lambda i: [], range(len(l[0]))) # eww! for i in range(len(l)): for j in range(len(m)): m[j].append(l[i][j]) return m def fold(s, l=80): "Splits a sequence into a list of subsequences of the given length. The name is taken from the command-line tool in unix." return [s[(i * l):((i + 1) * l)] for i in range((len(s) / l))] + [s[((len(s) / l) * l):]] def imap(fn, it): for thing in it: yield fn(thing) def dmap(fn, d): e = {} for key in d: e[key] = fn(d[key]) return e def choose(p, t, f): "The ternary operator; not that it always evaluates both branches" if p: return t else: return f def list_get(s, i, d=None): "Like dict.get for lists" try: return s[i] except IndexError: return d _STDIO_NULL = ("-", None) def _stdstream(name, mode, default): if (name not in _STDIO_NULL): return file(name, mode) else: return default # todo: if input and output filenames are the same (and not "-"), create a temp file for output, and copy over or rename on close or something def stdio(names, universal=False): "Given a list of names (which may have 0, 1 or >1 elements), return a pair of files to use as standard input and output." import sys stdin = _stdstream(list_get(names, 0), choose(universal, "rU", "r"), sys.stdin) stdout = _stdstream(list_get(names, 1), "w", sys.stdout) return stdin, stdout def stripped(f, lstrip=True): "Return an iterator over the strings in the iterable f in which strings are stripped of #-delimited comments and leading and trailing whitespace, and blank strings are skipped." for line in f: if ("#" in line): line = line[:line.index("#")] if (lstrip): line = line.strip() else: line = line.rstrip() if (line == ""): continue yield line raise StopIteration if (hasattr(__builtins__, "sorted")): sorted = sorted # yes really else: def sorted(l): m = list(l) m.sort() return m def delay(fn, *args): "Returns a delayed version of a function call; to force it, call it with no arguments." return lambda: fn(*args) def returner(x): return lambda: x def yielder(x): yield x def lift(fn): return lambda *args: map(fn, *args) def partial(fn, *args): return lambda *args2: fn(*(args + args2)) def rpartial(fn, *args): return lambda *args2: fn(*(args2 + args)) def visit(fn, seq): for thing in seq: fn(thing) def explodeargs(fn): return lambda args: fn(*args) innerstar = explodeargs def implodeargs(fn): return lambda *args: fn(args) outerstar = implodeargs def identity(x): return x # NB has to be here because it references the identity function in a static way - not cool with this def os_environ(keys, default=None, parse=identity, logdefault=None): if (isinstance(keys, str)): keys = [keys] try: import os for key in keys: if (key in os.environ): return parse(os.environ[key]) except: pass if (logdefault != None): log(logdefault, None, ", ".join(keys), "not set in environment, defaulting to", default) return default class limitedfile(object): # todo: add support for an offset as well; at present, you get this by seeking on f before making the limited file def __init__(self, f, lim): self.f = f self.lim = lim self.n = 0 # current file pointer; should be called i or fp, or could just use f.tell() + stashed initial fp throughout self.closed = False if (hasattr(f, "name")): self.name = f.name def read(self, n): n = min(n, self.available()) if (n == 0): return "" s = self.f.read(n) self.n = self.n + len(s) return s def available(self): return self.lim - self.n def close(self): if (self.f != None): self.f.close() self.f = None self.closed = True def __iter__(self): return self def next(self): return self.readline() def readline(self): # todo: see if there's a cleverer way to do this; if we're allowed to overstep interally, could use f.readline() import cStringIO buf = cStringIO.StringIO() while True: ch = self.read(1) buf.write(ch) if (ch in ("", "\n")): break line = str(buf) buf.close() return line def readlines(self): return list(self) def xreadlines(self): return self def seek(self, off, whence=0): if (whence == 0): here = self.f.tell() start = here - self.n end = start + self.lim assert ((off >= start) and (off <= end)), "absolute seek out of bounds: " + str(off) self.f.seek(off, 0) self.n = off - start elif (whence == 1): if (off == 0): return # no need to do anything if (off < 0): assert (-off <= self.n), "relative seek out of bounds: " + str(off) else: # (off > 0) assert (off <= self.available()), "relative seek out of bounds: " + str(off) self.f.seek(off, 1) self.n = self.n + off elif (whence == 2): raise IOError, "sorry, can't do end-relative seeks" else: raise IOError, "whenceof thou speakest is known not: " + str(whence) def tell(self): return self.f.tell() def log(src, obj, *msg): "Write a log message to stderr from component src, concerning object obj, consisting of the items listed as msg." import sys if (obj != None): line = src + ": " + str(obj) + ": " + " ".join(map(str, msg)) + "\n" else: line = src + ": " + " ".join(map(str, msg)) + "\n" sys.stderr.write(line) sys.stderr.flush() def isscalar(x): return not hasattr(x, "__iter__") # todo: is there a better way? def os_spawnv(mode, path, args, usepath=False, env=None): "This is just a wrapper round the os.spawnv* functions. Use spawn() instead." import os if (env != None): if (usepath): exit = os.spawnvpe(mode, path, args, env) else: exit = os.spawnve(mode, path, args, env) else: if (usepath): exit = os.spawnvp(mode, path, args) else: exit = os.spawnv(mode, path, args) return exit def spawn(path, args=[], usepath=False, env=None): "Waits for completion of the subprocess." import os args = [os.path.split(path)[1]] + args exit = os_spawnv(os.P_WAIT, path, args, usepath, env) if (exit != 0): if (exit < 0): raise EnvironmentError, (-exit, "command " + path + " killed by signal") else: raise EnvironmentError, (exit, "command " + path + " exited with nonzero status") return exit def os_execv(path, args, usepath=False, env=None): "This is just a wrapper round the os.execv* functions. There's a good chance you want the spawn function, above." import os if (env != None): if (usepath): os.execvpe(path, args, env) else: os.execve(path, args, env) else: if (usepath): os.execvp(path, args) else: os.execv(path, args) def setfd(src, dst, closesrc=False): import os if (src == None): os.close(dst) else: os.dup2(src, dst) if (closesrc): os.close(src) def first(l): "Returns the first non-None element in a list" for x in l: if (x != None): return x def filldefaults(values, defaults): return map(implodeargs(first), values, defaults) # todo for both spawns: nonblocking mode, returning pids; capture output as a file; capture output as a string or string list def waitpid(pid, cmd=""): pid, exit = os.waitpid(pid, 0) # todo: factor signal = exit & 0x7f core = (exit & 0x80) != 0 status = (exit >> 8) & 0xff if (signal != 0): if (core): cmsg = " (core dumped)" else: cmsg = "" raise EnvironmentError, (signal, "command " + cmd + " (" + str(pid) + ") killed by signal" + cmsg) if (status != 0): raise EnvironmentError, (status, "command " + cmd + " (" + str(pid) + ") exited with nonzero status") def spawnpipe(cmds, usepath=False, env=None, supply=None, show=True): "Waits for completion of the subprocess. supply is a readable fd used as the head of the pipeline; None means it's blanked off." pids = [] import sys import os defaults = (None, [], usepath, env) for cmd in cmds: piperead, pipewrite = os.pipe() pid = os.fork() if (isinstance(cmd, str)): cmd = (cmd,) path, args, _usepath, _env = filldefaults(cmd, defaults) args = [os.path.split(path)[1]] + args if (pid == 0): # child; exec a stage os.close(piperead) setfd(supply, sys.stdin.fileno(), True) setfd(pipewrite, sys.stdout.fileno(), True) os_execv(path, args, _usepath, _env) else: # parent os.close(pipewrite) supply = piperead pids.append((path, pid)) for line in os.fdopen(supply): # todo: fix so last stage writes straight to stdout, instead of having to be pumped if (show): sys.stdout.write(line) for path, pid in pids: waitpid(pid, path) # todo: flush? def demacifyPath(path): "MAJOR HACK!" if (not isinstance(path, unicode)): path = unicode(path, "mac_roman") return "/" + "/".join(path.replace("/", "\\/").split(":")[1:]) def ensuredirs(path): import os if (not os.path.isdir(path)): os.makedirs(path) def replace(s, a, b): "Return an iterable in which all instances of a are replaced with b. Does not do this in place." for x in s: if (x == a): yield b else: yield x def statframe(f): "Returns a tuple (function_name, filename, lineno) of key properties about a stack frame. Assumes traditional python frames and code objects." co = f.f_code name = co.co_name filename = co.co_filename lineno = f.f_lineno return (name, filename, lineno) def deprecate(msg): "Prints a deprecation warning with some reflected details. Assumes sys._getframe and statframe work." import sys target_name, target_filename, target_lineno = statframe(sys._getframe(1)) caller_name, caller_filename, caller_lineno = statframe(sys._getframe(2)) log("*** DEPRECATED", target_name, "called by", caller_name, "in", caller_filename, "at", caller_lineno, ";", msg) def dashfile(name, mode="r"): deprecate("use twic.stdio instead of twic.dashfile") if (name == "-"): import sys if (mode == "r"): return sys.stdin elif (mode in "wa"): return sys.stdout else: raise IOError, "unsupported mode: " + mode else: return file(name, mode) def walkfiles(top, topdown=True, onerror=None): import os for dir, subdirs, files in os.walk(top, topdown, onerror): for file in files: yield file def uniq(s): seen = set() for x in s: if (not x in seen): seen.add(x) yield x def chain(r): # todo: decide if i can deprecate this and use walk or flatten instead for s in r: for x in s: yield x def isnone(x): return x is None def selectivemap(p, fn, l, m=None): """Return an iterator in whicheach item x in l for which p is true maps to fn(x), and those in which it is false map to x; if the optional sequence m is supplied, the predicate is applied to corresponding items of m rather than l. """ if (m == None): m = l for x, y in zip(l, m): if (p(y)): yield fn(x) else: yield x if (hasattr(__builtins__, "sorted")): sorted = sorted else: def sorted(l, cmp=None, key=None, rev=False): assert key == None, "key not supported" l = list(l) l.sort(cmp) if (rev): l.reverse() return l def stretch(l, n, pad=None): """Makes a list l at least n items long, optionally filling in the new spaces with the specified pad """ import itertools m = n - len(l) if (m > 0): l.extend(itertools.repeat(pad, m)) def walk(ss): """Recursively walk a sequence of sequences (of sequences etc), returning the non-sequence items """ for x in ss: if (hasattr(x, "__iter__")): for y in walk(x): yield y else: yield x def flattenOne(s): "Flatten a sequence of sequences to a single list." return reduce(lambda a, l: a + l, s, []) def flatten(s): "Flatten a sequence of sequences, each of which may contain subsequences etc, to a single list." return list(walk(s)) # i have actually tested this against all comers, and it's faster _TIMER_NAMES = { "darwin": "time", "linux2": "time", "sunos5": "time", "win32": "clock", "wince": "clock" } def timer(): "Returns a function suitable for timing things; this returns the time in seconds since some arbitrary epoch, which may vary between runs." import sys import time timerName = _TIMER_NAMES.get(sys.platform, "time") timer = getattr(time, timerName) return timer def timeit(fn, t=1.0): "Measures the time taken to execute a function fn, doing enough iterations to fill t seconds. Uses wall time." u = t / 2.0 n = 0 k = 100.0 while (u < t): n = max(int((n * min((t / u), k))), (n + 1)) u = _timeit(fn, n) return u / n def _timeit(fn, n): clock = timer() it = xrange(n) t0 = clock() for i in it: fn() t1 = clock() return float((t1 - t0)) class lazyfile(object): "A lazily-opened file. Can be reopened." def __init__(name, mode="r"): self.name = name self.mode = mode self.f = None self.closed = True # could set softspace, but we ignore it, so that would be a lie def open(self): if (self.f == None): self.f = file(self.path, self.mode) self.closed = False return self.f def __iter__(self): return self def close(self): if (self.f != None): self.f.close() self.f = None self.closed = True def fileno(self): return self.open().fileno() def flush(self): if (self.f != None): self.f.flush() def isatty(self): return False # ? def next(self): return self.open().next() def read(self, size=-1): return self.open().read(size) def readline(self, size=-1): return self.open().readline(size) def readlines(self): return self.open().readlines() def seek(self, off, whence=0): self.open().seek(off, whence) def tell(self): return self.open().tell() def truncate(self): self.open().truncate(size) def write(self, s): self.open().write(s) def writelines(self, lines): self.open().writelines(lines) def xreadlines(self): return iter(self) def iterchars(f): "Iterates over the characters in a file." while True: ch = file.read(1) if (ch == 0): break yield ch if False: # use re.sub, doh def funkyreplace(s, old, new): "A funked-up version of str.replace: old is a regular expression, and new is a format string, which is substituted with the matched groups before replacement." if isinstance(old, str): import re old = re.compile(old) start = 0 while (start < len(s)): m = old.search(s, start) if (m == None): break replacement = new % m.groups() start = m.start() end = m.end() s = s[:start] + replacement + s[end:] start = end + (len(replacement) - (end - start)) return s # got these from http://www.hackcraft.net/web/datetime/ TIMEFMT_RFC1123 = "%a, %d %b %Y %H:%M:%S %Z" # TIMEZONE IS WRONG! it should be in +hhmm format, not string TIMEFMT_W3C8601 = "%Y-%m-%dT%H:%M:%S%Z" # TIMEZONE IS WRONG! it should be in +hh:mm format, not string TIMEFMT_LOCAL = "%c" # heh # and so ... TIMEFMT_RFC1123_NOTZ = "%a, %d %b %Y %H:%M:%S" TIMEFMT_W3C8601_NOTZ = "%Y-%m-%dT%H:%M:%S" tzReStr = r"(.*?) ?(-|\+)?(\d{2}):?(\d{2})$" tzRe = None def mktime_utc(tm): "return the time that this time struct would mean if it was in the UTC timezone (ignoring its DST flag - i think)" import time t = time.mktime(tm) isdst = time.localtime(t).tm_isdst assert (isdst != -1) if (isdst): tz = time.altzone else: tz = time.timezone return t - tz def mkstrptimetz(s, fmt): global tzRe import time if (tzRe == None): import re tzRe = re.compile(tzReStr) m = tzRe.match(s) if (m == None): s2 = s tz = 0 else: s2, tzs, tzh, tzm = m.groups() tz = (int(tzs + "1")) * (((int(tzh) * 60) + int(tzm)) * 60) tm = time.strptime(s2, fmt) return mktime_utc(tm) - tz def getmapped(m, k, fn, d=None): if (k in m): return fn(m[k]) else: return d def alltrue(s): for p in s: if (not s): return False return True def reraise(): import sys e = sys.exc_info()[1] raise e def mins(s, cmp=cmp): "Returns a list of the smallest elements of s, ie the elements x for which there is no y in s such that cmp(x, y) == -1. This is only really useful on sets with no total order. It is assumed that the comparison function is transitive." minima = [] for x in s: minimal = True i = 0 while (i < len(minima)): m = minima[i] d = cmp(x, m) if (d == 1): # x > m minimal = False break elif (d == -1): # x < m del minima[i] else: i = i + 1 if (minimal): minima.append(x) return minima def maxs(s, cmp=cmp): "See mins." return mins(s, lambda a, b: cmp(b, a)) def cmp_pareto(a, b, strict=True, cmp=cmp): "A Pareto comparison of two sequences, a and b: a is greater than b if every element in a is greater than the corresponding element in b, and vice versa; sequences where there are elements in each which are greater than the corresponding element in the other are judged equal. If strict is false, the element only has to be greater than or equal to (although if all elements are equal, the two sequences are still equal)." import itertools agreater = False bgreater = False for x, y in itertools.izip(a, b): d = cmp(x, y) if (d == 1): # a > b if (bgreater): return 0 agreater = True elif (d == -1): # a < b if (agreater): return 0 bgreater = True else: if (strict): return 0 assert (not (agreater and bgreater)), "DOES NOT COMPUTE" if (agreater): return 1 elif (bgreater): return -1 else: return 0 PS = os_environ("PS", "/bin/ps") def ps(fullNames=True): "Returns an iterator over all the currently running processes, giving for each a tuple (pid, command, user, terminal, %cpu, %memory)." import subprocess if (fullNames): flags = "-auwwx" else: flags = "-aucwwx" cmd = [PS, flags] ps = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE) header = ps.stdout.readline() for line in ps.stdout: user, pidStr, cpuStr, memStr, vszStr, rssStr, tt, stat, startedStr, timeStr, cmd = line.rstrip().split(None, 10) pid = int(pidStr) cpu = float(cpuStr) mem = float(memStr) vsz = int(vszStr) rss = int(rssStr) # yield (user, pid, cpu, mem, vsz, rss, tt, stat, startedStr, timeStr, cmd) yield (pid, cmd, user, tt, cpu, mem) """ 1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP 6) SIGABRT 7) SIGEMT 8) SIGFPE 9) SIGKILL 10) SIGBUS 11) SIGSEGV 12) SIGSYS 13) SIGPIPE 14) SIGALRM 15) SIGTERM 16) SIGURG 17) SIGSTOP 18) SIGTSTP 19) SIGCONT 20) SIGCHLD 21) SIGTTIN 22) SIGTTOU 23) SIGIO 24) SIGXCPU 25) SIGXFSZ 26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGINFO 30) SIGUSR1 31) SIGUSR2 """ KILL = os_environ("KILL", "/bin/kill") def kill(pid, sig="TERM"): assert (pid > 0), "can't kill init or everything this way; too much" args = ["-s", sig.upper(), str(pid)] spawn(KILL, args) FUNKY_CHARS = { # stolen from Marcin Ciura, via Fredrik Lundh u'\N{Latin capital letter AE}': 'AE', u'\N{Latin small letter ae}': 'ae', u'\N{Latin capital letter Eth}': 'Dh', u'\N{Latin small letter eth}': 'dh', u'\N{Latin capital letter O with stroke}': 'Oe', u'\N{Latin small letter o with stroke}': 'oe', u'\N{Latin capital letter Thorn}': 'Th', u'\N{Latin small letter thorn}': 'th', u'\N{Latin small letter sharp s}': 'ss', u'\N{Latin capital letter D with stroke}': 'Dj', u'\N{Latin small letter d with stroke}': 'dj', u'\N{Latin capital letter H with stroke}': 'H', u'\N{Latin small letter h with stroke}': 'h', u'\N{Latin small letter dotless i}': 'i', u'\N{Latin small letter kra}': 'q', u'\N{Latin capital letter L with stroke}': 'L', u'\N{Latin small letter l with stroke}': 'l', u'\N{Latin capital letter Eng}': 'Ng', u'\N{Latin small letter eng}': 'ng', u'\N{Latin capital ligature OE}': 'Oe', u'\N{Latin small ligature oe}': 'oe', u'\N{Latin capital letter T with stroke}': 'Th', u'\N{Latin small letter t with stroke}': 'th' } def encode(s, enc="latin-1"): "Encode a unicode string s in encoding enc (defaulting to latin-1), dealing with freaky characters as well as possible." import unicodedata while True: try: return s.encode(enc) except UnicodeEncodeError, e: if (not e.reason.startswith("ordinal not in range")): raise UnicodeEncodeError, e ch = s[e.start] decomposition = unicodedata.normalize("NFKD", ch) if (len(decomposition) > 1): replacement = decomposition[0] else: replacement = FUNKY_CHARS.get(ch, None) if (replacement == None): raise UnicodeEncodeError, e s = s.replace(ch, replacement)