Source code for scibeam.util.pipeline

# pipeline.py --- 
# 
# Filename: pipeline.py
# Description: 
#           pipeline to chain series of process
# Author:    Yu Lu
# Email:     yulu@utexas.edu
# Github:    https://github.com/SuperYuLu 
# 
# Created: Tue May 15 00:55:01 2018 (-0500)
# Version: 
# Last-Updated: Wed Aug 22 11:42:46 2018 (-0500)
#           By: yulu
#     Update #: 31
# 

import pandas

[docs]class Pipeline:
[docs] def __init__(self, steps, momery = None): """ [(name, func, func_kwargs)] """ self.names = steps self.tasks = steps self.params = steps #self._validate_steps() self.memory = memory
@property def names(self): return self.__names @names.setter def names(self, steps): namelist = [] for i, s in enumerate(steps): if len(s) == 1: step_name = 'step' + str(i) elif len(s) > 1: step_name = s[0] if type(s[0]) == str else 'step' + str(i) else: raise ValueError("[*] Task cannot be empty !" "Step %s in pipeline is null" %str(i)) namelist.append(step_name) self.__names = namelist @property def tasks(self): return self.__tasks @tasks.setter def tasks(self, steps): tasklist = [] for i, t in enumerate(steps): if len(t) == 1: step_task = t[0] elif len(t) > 1: step_task = t[1] if type(t[0]) == str else t[0] else: raise ValueError("[*] Task cannot be empty !" "Step %s in pipeline is null" %str(i)) self.__tasks = tasklist @property def params(self): return self.__params @params.setter def params(self, steps): paramslist = [] for p in steps: if len(p) == 3: step_params = p[2] elif len(p) == 2 and type(p[1]) == dict: step_params = p[1] else: continue return paramslist def _validate_steps(self): names, tasks = zip(*self.steps) for t in tasks[:-1]: if t is None: continue if not (isinstance(t, pandas.DataFrame) == isinstance(t, pandas.Series)): raise TypeError("All pipeline tasks should be instance of padas.Dataframe of pandas.Series" " '%s' (type %s) doesn't" % (t, type(t)))
[docs] def apply(self, inputs): input_value, input_label = zip(*inputs) for val in input_value: valIn = val for task in self.tasks: valOut = task(valIn, **self.params) valIn = task(valIn, **self.params)