Source code for fussy.nbio

"""Wraps subprocess with pipe semantics and generator based multiplexing

pipe = open( somefile, 'rb' ) | nbio.Process( ['grep','blue'] ) | nbio.Process( ['wc','-l'])

"""
import os, select, fcntl, itertools, time, sys, popen2, signal, logging, subprocess, errno
log = logging.getLogger( __name__ )

class _DidNothing( object ):
    """Returned when a subprocess didn't do anything"""
    def __nonzero__( self ):
        return False
DID_NOTHING = _DidNothing()

[docs]class NBIOError( RuntimeError ): """Base class for nbio errors"""
[docs]class ProcessError( NBIOError ): """Called process returned an error code Attributes: process -- the Process (if applicable) which raised the error """ process = None
[docs]def pause( duration ): """Allow tests to override sleeping using globalsub""" time.sleep( duration )
[docs]class Pipe( object ): """Pipe of N processes which all need to process data in parallel""" pause_on_silence = 0.01 started = False def __init__( self, *processes ): self.processes = [] if processes: first = self.append( self.get_component(processes[0]) ) for process in processes[1:]: self.__or__( process ) def __repr__( self ): return '%s => %s'%( self.__class__.__name__, ' | '.join( [str(p) for p in self.processes]) )
[docs] def __getitem__(self, index): """Retrieve a particular item in the pipe""" return self.processes[index]
[docs] def __len__( self ): """Return the number of items in this pipe""" return len( self.processes )
[docs] def __iter__( self ): """Iterate over the processes in the pipe If the stdout/stderr of the processes is not captured, then we will yield the results in whatever chunk-size is yielded from the individual processes. If all of the processes yield DID_NOTHING in a particular cycle, then the pipe will do a pause() for self.pause_on_silence (normally passed into the __call__) before the next iteration. """ try: self.started = True iterables = [ iter(process) for process in self.processes ] crashed = False while iterables and not crashed: exhausted = [] something_happened = 0 for iterable in iterables: try: child_result = iterable.next() if child_result is DID_NOTHING: pass elif child_result: something_happened += 1 yield child_result except StopIteration, err: exhausted.append( iterable ) except Exception, err: # Have the *first* item that failed raise the error, # and only if they don't, raise this error... for item in self.processes: item.check_exit() raise for done in exhausted: while done in iterables: iterables.remove( done ) if not something_happened: pause( self.pause_on_silence ) finally: self.kill()
[docs] def __call__( self, pause_on_silence=0.01 ): """Iterate over this pipeline, returning combined results as a string """ result = [] self.pause_on_silence = pause_on_silence try: for item in itertools.ifilter( bool, self ): if item: result.append( item ) return "".join( result ) except Exception, err: err.output = result raise
[docs] def append( self, process ): """Add the given PipeComponent to this pipe (note: does not connect stdin/stdout)""" assert isinstance( process, PipeComponent ), process self.processes.append( process )
[docs] def prepend( self, process ): """Add the given PipeComponent to this pipe (note: does not connect stdin/stdout)""" assert isinstance( process, PipeComponent ), process self.processes.insert( 0, process )
@property
[docs] def first( self ): """Retrieves the first item in the pipe""" if self.processes: return self.processes[0] raise IndexError( "No processes yet in this pipeline" )
@property
[docs] def last( self ): """Retrieves the last item in the pipe""" if self.processes: return self.processes[-1] raise IndexError( "No processes yet in this pipeline" )
[docs] def __or__( self, other ): """Pipe our output into a process, callable or list""" other = self.get_component( other ) log.debug( 'Hooking output of %s to input of %s', self.last, other ) self.last.iterables.append( other.iter_write( self.last.iter_read() ) ) self.append( other ) return self
[docs] def __ror__( self, other ): """Pipe output of other into our first item""" log.debug( 'Hooking output of %s to input of %s', other, self.first ) other = self.get_component( other ) self.first.iterables.append( self.first.iter_write( other.iter_read() ) ) self.prepend( other ) return self
[docs] def __gt__( self, other ): """Pipe our output into a file """ if isinstance( other, (str,unicode)): if other not in ('','-'): return self.__or__( open(other,'wb') ) else: return self.__or__( other ) else: return self.__or__( other )
[docs] def __lt__( self, other ): """Pipe input from a named file""" if isinstance( other, (str,unicode)): if other not in ('','-'): return self.__ror__( open(other,'rb') ) else: return self.__ror__( other ) else: return self.__ror__( other )
[docs] def get_component( self, other ): """Given a python object other, create a PipeComponent for it The purpose of this method is to allow for fairly "natural" descriptions of tasks. You can pipe to or from files, to or from the string '-' (stdin/stdout), to the string '' (collect stdout), or from a regular string (which is treated as input). You can pipe iterables into a pipe, you can pipe the result of pipes into callables. """ if isinstance( other, PipeComponent ): pass elif isinstance( other, file ): other = FileComponent( other ) elif isinstance( other, (str,unicode)): if other == '-': other = FileComponent( sys.stdout, sys.stdin ) elif other == '': other = IterComponent( ) else: other = IterComponent( [other] ) elif hasattr( other, '__iter__'): other = IterComponent( other ) elif callable( other ): other = FunctionComponent( other ) if not isinstance( other, PipeComponent ): raise TypeError( """Require a PipeComponent-compatible object, got: %s"""%(other,) ) return other
def kill( self ): for process in self.processes: if hasattr( process, 'kill' ): try: process.kill() except Exception, err: log.warn( "Unable to kill process: %s", process, )
class PipeComponent( object ): live = True def __init__( self ): self.iterables = [] def __iter__( self ): iterables = [ iter(x) for x in self.iterables ] while iterables and self.live: something_happened = 0 exhausted = [] for iterable in iterables: try: child_result = iterable.next() if child_result is DID_NOTHING: pass elif child_result: something_happened += 1 yield child_result except StopIteration, err: exhausted.append( iterable ) except Exception, err: err.args += (self,) raise for done in exhausted: while done in iterables: iterables.remove( done ) if not something_happened: yield DID_NOTHING self.check_exit() def check_exit( self ): pass def iter_read( self ): """Iterate reading from stdout""" raise TypeError( "Do not have an iter read for %s"%(self.__class__, )) def iter_write( self, source ): raise TypeError( "Do not have an iter write for %s"%(self.__class__, )) class FileComponent( PipeComponent ): def __init__( self, filein,fileout=None ): self.stdin = filein self.stdout = fileout or filein super( FileComponent, self ).__init__() def iter_read( self ): return reader( self.stdout ) def iter_write( self, source ): return writeiter( source, self.stdin, ) def __repr__( self ): if self.stdin != self.stdout: return '%s/%s'%( self.stdout, self.stdin ) else: return 'file( %r, %r )'%( self.stdin.name, self.stdin.mode )
[docs]class Process( PipeComponent ): """A particular process in a Pipe Processes are the most common entry point when using nbio, you create processes and pipe data into or out of them as appropriate to create Pipes. Under the covers the Process runs subprocess.Popen, and it accepts most of the fields subprocess.Popen does. By default it captures stdout and pipes data into stdin. If nothing is connected to stdin then stdin is closed on the first iteration of the pipe. If nothing is connected to stdout or stderr (if stderr is captured) then the results will be returned to the caller joined together with '' The implication is that if you do not want to store all of the results in RAM, you need to "sink" the results into a process or file, or *not* capture the results (pass False for stdout or stderr). """ stdin_needed = False stdout_needed = False stderr_needed = False STDOUT = -1 by_line = False
[docs] def __init__( self, command, stderr=False, stdout=True, stdin = True,**named ): """Initialize the Process command -- subprocess.Popen command string or list if a string, and "shell" is not explicitly set, then will set "shell=True" stdin -- whether to provide stdin writing stdout -- whether to capture stdout stderr -- whether to capture stderr, if -1, then combine stdout and stderr good_exit -- if provided, iterable which provides the set of good exit codes which will not raise ProcessError when encountered by_line -- if provided, will cause the output to be line-buffered so that only full lines will be reported, the '\\n' character will be used to split the output, so there will be no '\\n' character at the end of each line. named -- passed to the subprocess.Popen() command """ if isinstance( command, (str,unicode)): if not 'shell' in named: named['shell'] = True self.command = command if 'good_exit' in named: self.good_exit = named.pop( 'good_exit' ) else: self.good_exit = [0] if 'by_line' in named: self.by_line = named.pop( 'by_line' ) self.pipe = self.start_pipe(stdin,stdout,stderr, **named) super( Process, self ).__init__( )
def __unicode__( self ): return u'%s( %r )'%( self.__class__.__name__, self.command ) __repr__ = __unicode__ @property def stderr( self ): return self.pipe.stderr @property def stdout( self ): return self.pipe.stdout @property def stdin( self ): return self.pipe.stdin
[docs] def start_pipe( self, stdin,stdout,stderr, **named ): """Start the captive process (internal operation)""" err = None if stderr == -1: stderr = subprocess.STDOUT stdout = True else: stderr = [None,subprocess.PIPE][bool(stderr)] pipe = subprocess.Popen( self.command, stdin = [None,subprocess.PIPE][bool(stdin)], stdout = [None,subprocess.PIPE][bool(stdout)], stderr = [None,subprocess.PIPE][bool(stderr)], **named ) return pipe
[docs] def __iter__( self ): """Iterate over the results of the process (normally done by the Pipe)""" if not self.stdout_needed: # nothing has been hooked to stdout if self.stdout: # but stdout was captured, so we need to consume it to prevent deadlocks self.iterables.append( self.iter_read() ) if not self.stderr_needed: # nothing has been hooked to stderr if self.stderr: self.iterables.append( self.iter_write( self.stderr ) ) if not self.stdin_needed: # nothing is being sent in, so input is finished... close( self.stdin ) return super( Process, self ).__iter__()
[docs] def __or__( self, other ): """Pipe our output into a process, callable or list pipe = Pipe( Process( 'cat test.txt' ) | Process( 'grep blue' ) | [] ) pipe() """ self.stdout_needed = True if isinstance( other, Pipe ): # Pipe our output into the pipe, add ourselves to the start of pipe... return other.__nor__( self ) else: pipe = Pipe( self ) return pipe.__or__( other )
[docs] def __ror__( self, other ): """Pipe other into self""" self.stdin_needed = True if isinstance( other, Pipe ): return other.__or__( self ) else: pipe = Pipe( self ) return pipe.__ror__( other )
[docs] def __gt__( self, other ): """Pipe our output into a filename""" pipe = Pipe( self ) return pipe.__gt__( other )
[docs] def __lt__( self, other ): """Pipe our input from a filename""" pipe = Pipe( self ) return pipe.__lt__( other )
[docs] def __call__( self, *args, **named ): """Create a Pipe and run it with just this item as its children""" pipe = Pipe( self ) return pipe( *args, **named )
[docs] def check_exit( self ): """Check our exit code""" if self.pipe.returncode is None: exitcode = self.pipe.poll() else: exitcode = self.pipe.returncode if exitcode < 0: self.pipe.kill() exitcode = self.pipe.poll() if exitcode is None: log.error( "Process %s appears not to have exited properly", self ) elif exitcode not in self.good_exit: err = ProcessError( "Process %s returned error code %s"%( self.command, exitcode, ), ) err.process = self raise err
[docs] def iter_read( self ): """Create the thing which iterates our read operation""" self.stdout_needed = True output = reader( self.stdout ) if self.by_line: output = by_line( output ) return output
[docs] def iter_write( self, source ): """Create a thing which will read from source and write to us""" self.stdin_needed = True return writeiter( source, self.stdin, )
[docs] def kill( self ): """Kill our underlying subprocess.Popen""" try: return self.pipe.kill( ) except OSError, err: # we've already died/closed... pass
class FunctionComponent( PipeComponent ): def __init__( self, function ): self.function = function super( FunctionComponent, self ).__init__() def iter_read( self ): yield self.function() def iter_write( self, source ): return caller( source, self.function ) def __str__( self ): return str( self.function ) class IterComponent( PipeComponent ): def __init__( self, iterable=None ): self.iterable = iterable super( IterComponent, self ).__init__() def iter_read( self ): return self.iterable def iter_write( self, other ): return other def __repr__( self ): if self.iterable: return str( self.iterable ) else: return '<str>' def collector( iterable, target ): return caller( iterable, target.append ) def caller( iterable, target ): for item in iter(iterable): if item: target( item ) yield item #def pipeline( *commands ): # pipe = [] # previous_stdout = subprocess.PIPE # for command in commands: # pipe.append( subprocess.Popen( command, stdin=previous_stdout, stdout=subprocess.PIPE )) # previous_stdout = pipe[-1].stdout # return pipe
[docs]def writeiter( iterator, fh ): """Write content from iterator to fh To write a file from a read file: .. code-block:: python writeiter( reader( open( filename )), fh ) To write a request.response object into a tar pipe iteratively: .. code-block:: python writeiter( response.iter_content( 4096, decode_unicode=False ), pipe ) """ total_written = 0 assert hasattr( iterator, '__iter__'), iterator for content in iterator: if content and isinstance( content, (str,unicode)): for block_written in writer( content, fh ): total_written += block_written yield None else: yield DID_NOTHING close( fh )
[docs]def writer( content,fh, encoding='utf8' ): """Continue writing content (string) to fh until content is consumed Used by writeiter to writing individual bits of content to the fh """ fp = fileno( fh ) if isinstance( content, unicode ): content = content.encode( encoding ) assert isinstance( content, str ), """Can only write string values: %r"""%(content) while content: try: written = os.write( fp, content ) except ValueError, err: log.warn( 'Unconsumed content: %s', content ) break if not written: yield DID_NOTHING else: content = content[written:] yield written
[docs]def reader( fh, blocksize = 4096 ): """Produce content blocks from fh without blocking """ try: fd = fileno( fh ) except ValueError, err: return total = 0 while not fh.closed: try: rr,rw,re = select.select([fh],[],[],0) if rr: result = os.read(fd, blocksize ) if result == '': break else: result = DID_NOTHING except ValueError, err: break except (IOError,OSError), err: if err.args[0] in ( errno.EWOULDBLOCK,): if fh.closed: break yield DID_NOTHING else: raise else: yield result close( fh )
[docs]def fileno( fh ): "Determine the fileno for the file-like thing" if hasattr( fh, 'fileno' ): return fh.fileno() return fh
[docs]def close( fh ): """Close the file/socket/closable thing""" if fh is sys.stdout or fh is sys.stderr: return if hasattr( fh, 'close'): errcode = fh.close() else: fn = fileno( fh ) if fn and isinstance( fn, (int,long)): errcode = os.close( fileno(fh) ) else: errcode = 0 if errcode: raise RuntimeError( "Child process returned error code", errcode )
[docs]def by_line( iterable ): """Buffer iterable yielding individual lines""" buffer = "" for item in iterable: if not item: yield item else: buffer += item if '\n' not in item: # We did something, but we're not ready to yield a real value... yield None else: while '\n' in buffer: line, buffer = buffer.split('\n',1) yield line if buffer: yield buffer

Project Versions