Commit 9278aa0c authored by Janez K's avatar Janez K
Browse files

Merge branch 'speedup' into dev

parents 3be5187a 16edca33
[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover
# Don't complain about missing debug-only code:
def __repr__
if self\.debug
if settings\.DEBUG
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
\ No newline at end of file
......@@ -28,3 +28,5 @@ build/
/mothra/mothra.db-journal
celerybeat-schedule
.coverage
htmlcov
\ No newline at end of file
coverage run --source='.' manage.py test
coverage html --omit=workflows/segmine/data/mappings.py
import workflows.library
import time
import random
class WidgetRunner():
def __init__(self,widget,workflow_runner,standalone=False):
self.widget = widget
self.inputs = widget.inputs.all()
self.output = widget.outputs.all()
self.workflow_runner = workflow_runner
self.inner_workflow_runner = None
self.standalone = standalone
if self.standalone:
for w in self.workflow_runner.widgets:
if w.id == self.widget.id:
self.widget = w
break
else:
raise Exception("this shouldn't happen!")
def run(self):
self.widget.running = True
""" subprocesses and regular widgets get treated here """
if self.widget.type == 'regular' or self.widget.type == 'subprocess':
if self.widget.abstract_widget:
function_to_call = getattr(workflows.library,self.widget.abstract_widget.action)
input_dict = self.get_input_dictionary()
outputs = {}
start = time.time()
try:
if self.widget.abstract_widget:
if self.widget.abstract_widget.wsdl != '':
input_dict['wsdl']=self.abstract_widget.wsdl
input_dict['wsdl_method']=self.abstract_widget.wsdl_method
if self.widget.abstract_widget.has_progress_bar:
outputs = function_to_call(input_dict,self.widget)
elif self.widget.abstract_widget.is_streaming:
outputs = function_to_call(input_dict,self.widget,None)
else:
outputs = function_to_call(input_dict)
else:
""" we run the subprocess """
self.inner_workflow_runner = WorkflowRunner(self.widget.workflow_link,parent=self.workflow_runner)
self.inner_workflow_runner.run()
except:
self.widget.error=True
self.widget.running=False
self.widget.finished=False
raise
elapsed = (time.time()-start)
outputs['clowdflows_elapsed']=elapsed
self.assign_outputs(outputs)
elif self.widget.type == 'input':
for o in self.widget.outputs.all():
o.value = self.workflow_runner.parent.inputs[o.outer_input_id].value
elif self.widget.type == 'output':
input_dict = self.get_input_dictionary()
for i in self.widget.inputs.all():
self.workflow_runner.parent.outputs[i.outer_output_id].value = i.value
elif self.widget.type == 'for_output':
input_dict = self.get_input_dictionary()
for i in self.widget.inputs.all():
self.workflow_runner.parent.outputs[i.outer_output_id].value.append(i.value)
elif self.widget.type == 'cv_output':
input_dict = self.get_input_dictionary()
for i in self.widget.inputs.all():
self.workflow_runner.parent.outputs[i.outer_output_id].value.append(i.value)
self.widget.running = False
self.widget.error = False
self.widget.finished = True
if self.standalone:
self.save()
def assign_outputs(self,outputs):
for o in self.widget.outputs.all():
try:
o.value = outputs[o.variable]
except:
pass
def get_input_dictionary(self):
input_dictionary = {}
for i in self.widget.inputs.all():
""" if this isn't a parameter we need to fetch it
from the output. """
if not i.parameter:
connection = self.workflow_runner.get_connection_for_input(i)
if connection:
i.value = self.workflow_runner.outputs[connection.output_id].value
else:
i.value = None
""" here we assign the value to the dictionary """
if i.multi_id==0:
input_dictionary[i.variable]=i.value
else: # it's a multiple input
if not i.variable in input_dictionary:
input_dictionary[i.variable]=[]
if not i.value==None:
input_dictionary[i.variable].append(i.value)
return input_dictionary
def save(self):
for i in self.widget.inputs.all():
i.save()
for o in self.widget.outputs.all():
print o.value
o.save()
self.widget.save()
class WorkflowRunner():
def __init__(self,workflow,clean=True,parent=None):
self.workflow = workflow
self.connections = workflow.connections.all()
self.widgets = workflow.widgets.all().select_related('abstract_widget').prefetch_related('inputs','outputs')
self.inputs = {}
self.outputs = {}
for w in self.widgets:
for i in w.inputs.all():
self.inputs[i.id] = i
for o in w.outputs.all():
self.outputs[o.id] = o
self.clean = clean
self.parent = parent
def is_for_loop(self):
for w in self.widgets:
if w.type=='for_input':
return True
return False
def is_cross_validation(self):
for w in self.widgets:
if w.type=='cv_input':
return True
return False
def cleanup(self):
for w in self.widgets:
if self.clean:
w.finished = False
w.error = False
def get_connection_for_output(self,output):
for c in self.connections:
if c.output_id==output.id:
return c
return None
def get_connection_for_input(self,input):
for c in self.connections:
if c.input_id==input.id:
return c
return None
@property
def finished_widgets(self):
finished_widgets = []
for w in self.widgets:
if w.finished:
finished_widgets.append(w)
return finished_widgets
@property
def unfinished_widgets(self):
unfinished_widgets = []
for w in self.widgets:
if not w.finished and not w.running and not w.error:
unfinished_widgets.append(w)
return unfinished_widgets
@property
def runnable_widgets(self):
""" a widget is runnable if all widgets connected before
it are finished (i.e. widgets that have outputs that
are connected to this widget's input) """
finished_widget_ids = [w.id for w in self.finished_widgets]
runnable = []
for w in self.unfinished_widgets:
ready_to_run = True
for c in self.connections:
if self.inputs[c.input_id].widget_id == w.id and not self.outputs[c.output_id].widget_id in finished_widget_ids:
ready_to_run = False
if ready_to_run:
runnable.append(w)
return runnable
def run_all_unfinished_widgets(self):
runnable_widgets = self.runnable_widgets
while len(runnable_widgets)>0:
for w in runnable_widgets:
wr = WidgetRunner(w,self)
wr.run()
runnable_widgets = self.runnable_widgets
def run(self):
self.cleanup()
if self.is_for_loop():
fi = None
fo = None
for w in self.widgets:
if w.type=='for_input':
fi = w
if w.type=='for_output':
fo = w
outer_output = self.parent.outputs[fo.inputs.all()[0].outer_output_id]
outer_output.value = []
input_list = self.parent.inputs[fi.outputs.all()[0].outer_input_id].value
for i in input_list:
self.cleanup()
proper_output = fi.outputs.all()[0]
proper_output.value = i
fi.finished = True
self.run_all_unfinished_widgets()
elif self.is_cross_validation():
import random as rand
fi = None
fo = None
for w in self.widgets:
if w.type=='cv_input':
fi = w
if w.type=='cv_output':
fo = w
outer_output = self.parent.outputs[fo.inputs.all()[0].outer_output_id]
outer_output.value = []
input_list = self.parent.inputs[fi.outputs.all()[0].outer_input_id].value
input_fold = self.parent.inputs[fi.outputs.all()[1].outer_input_id].value
input_seed = self.parent.inputs[fi.outputs.all()[2].outer_input_id].value
if input_fold != None:
input_fold = int(input_fold)
else:
input_fold = 10
if input_seed != None:
input_seed = int(input_seed)
else:
input_seed = random.randint(0,10**9)
input_type = input_list.__class__.__name__
context = None
if input_type == 'DBContext':
context = input_list
input_list = context.orng_tables.get(context.target_table,None)
if not input_list:
raise Exception('CrossValidation: Empty input list!')
folds = []
if hasattr(input_list, "get_items_ref"):
import orange
indices = orange.MakeRandomIndicesCV(input_list, randseed=input_seed, folds=input_fold, stratified=orange.MakeRandomIndices.Stratified)
for i in range(input_fold):
output_train = input_list.select(indices, i, negate=1)
output_test = input_list.select(indices, i)
output_train.name = input_list.name
output_test.name = input_list.name
folds.append((output_train, output_test))
else:
rand.seed(input_seed)
rand.shuffle(input_list)
folds = [input_list[i::input_fold] for i in range(input_fold)]
proper_output = fi.outputs.all()[2]
proper_output.value = input_seed
for i in range(len(folds)):
#import pdb; pdb.set_trace()
if hasattr(input_list, "get_items_ref"):
output_test = folds[i][1]
output_train = folds[i][0]
else:
output_train = folds[:i] + folds[i+1:]
output_test = folds[i]
if input_type == 'DBContext':
output_train_obj = context.copy()
output_train_obj.orng_tables[context.target_table] = output_train
output_test_obj = context.copy()
output_test_obj.orng_tables[context.target_table] = output_test
output_train = output_train_obj
output_test = output_test_obj
self.cleanup()
proper_output = fi.outputs.all()[0] # inner output
proper_output.value = output_train
proper_output = fi.outputs.all()[1] # inner output
proper_output.value = output_test
fi.finished=True # set the input widget as finished
self.run_all_unfinished_widgets()
else:
self.run_all_unfinished_widgets()
self.save()
def save(self):
for w in self.widgets:
for i in w.inputs.all():
i.save(force_update=True)
for o in w.outputs.all():
o.save(force_update=True)
w.save(force_update=True)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
......@@ -21,4 +21,4 @@ class HarfLearner(orngRFCons.RandomForestLearner):
self.learner
class HarfClassifier(orngRFCons.RandomForestClassifier):
#class HarfClassifier(orngRFCons.RandomForestClassifier):
......@@ -19,6 +19,8 @@ if USE_CONCURRENCY:
from workflows.tasks import executeWidgetFunction, executeWidgetProgressBar, executeWidgetStreaming, executeWidgetWithRequest, runWidget, executeWidgetPostInteract
from workflows.engine import WidgetRunner, WorkflowRunner
class WidgetException(Exception):
pass
......@@ -157,7 +159,7 @@ class Workflow(models.Model):
for w in widgets:
if not w.finished and not w.running:
ready_to_run = True
connections = self.connections.filter(input__widget=w)
connections = self.connections.filter(input__widget=w).select_related('input__widget')
for c in connections:
if not c.output.widget.finished:
#print c.output.widget
......@@ -167,17 +169,46 @@ class Workflow(models.Model):
unfinished_list.append(w)
return unfinished_list
"""def run_for_loop(self):
widgets = self.widgets.all().prefetch_related('inputs','outputs')
connections = self.connections.all().select_related('input','output','input__widget','output__widget')
fi = None
fo = None
for w in widgets:
if w.type=='for_input':
fi = w
if w.type=='for_output':
fo = w
outer_output = fo.inputs.all()[0].outer_output
outer_output.value=[]
outer_output.save()
total = len(widgets)
input_list = fi.outputs.all()[0].outer_input.value # get all inputs from outer part
progress_total = len(input_list) # for progress bar
current_iteration = 0
for i in input_list:
finished = []
unfinished_list = []
fi.finished = True
proper_output = fi.outputs.all()[0]"""
def run_for_loop(self):
""" Method runs the workflow for loop. The use of [0] at the end of lines is because
there can be only one for loop in one workflow. This way we take the first one. """
#clear for_input and for_output
#print("run_for_loop")
fi = self.widgets.filter(type='for_input')[0]
fo = self.widgets.filter(type='for_output')[0]
widgets = self.widgets.all().prefetch_related('inputs','outputs')
fi = None
fo = None
for w in widgets:
if w.type=='for_input':
fi = w
if w.type=='for_output':
fo = w
outer_output = fo.inputs.all()[0].outer_output
outer_output.value=[]
outer_output.save()
total = len(widgets)
input_list = fi.outputs.all()[0].outer_input.value # get all inputs from outer part
progress_total = len(input_list) # for progress bar
current_iteration = 0
......@@ -198,10 +229,12 @@ class Workflow(models.Model):
while len(unfinished_list)>0:
for w in unfinished_list:
w.run(True) # run the widget
total = self.widgets.count()
completed = self.widgets.filter(finished=True).count()
self.widget.progress = (int)((current_iteration*100.0/progress_total)+(((completed*1.0)/total)*(100/progress_total)))
self.widget.save()
completed = 0
for w in widgets:
if w.finished:
completed = completed+1
self.widget.progress = (int)((current_iteration*100.0/progress_total)+(((completed*1.0)/total)*(100/progress_total)))
self.widget.save()
unfinished_list = self.get_runnable_widgets()
except:
raise
......@@ -763,16 +796,9 @@ class Widget(models.Model):
""" else run abstract widget function """
outputs = function_to_call(input_dict)
else:
if self.workflow_link.is_for_loop():
""" if this is object is a for loop than true and run;
else false and run workflow """
#print("proper_run_is_for_loop")
self.workflow_link.run_for_loop()
#print self.outputs.all()[0].value
elif self.workflow_link.is_cross_validation():
self.workflow_link.run_cross_validation()
else:
self.workflow_link.run()
wr = WidgetRunner(self,workflow_runner=WorkflowRunner(self.workflow,clean=False),standalone=True)
wr.run()
return
except:
self.error=True
self.running=False
......@@ -854,16 +880,23 @@ class Widget(models.Model):
elif self.type == 'cv_output':
""" if object is an output widget for cross validation,
then read output values and configure parameters"""
print "smo v cv_output"
for i in self.inputs.all():
if not i.parameter:
""" if there is a connection than true and read the output value """
if i.connections.count() > 0:
if i.value is None:
print "1"
i.value = [i.connections.all()[0].output.value]
else:
print "2"
i.value = [i.connections.all()[0].output.value] + i.value
print i.value
#print i.value
i.save()
print "----"
print i.outer_output.value
print "----"
i.outer_output.value.append(i.value)
i.outer_output.save()
self.finished=True
......
......@@ -6,11 +6,46 @@ Replace this with more appropriate tests for your application.
"""
from django.test import TestCase
from workflows.engine import WorkflowRunner, WidgetRunner
from workflows.models import Workflow, Widget
import time
class WorkflowEngineTest(TestCase):
fixtures = ['test_data',]
def test_fast_workflow_runner(self):
w = Workflow.objects.get(name='For loop test')
wr = WorkflowRunner(w)
wr.run()
wid = Widget.objects.get(id=6)
o = wid.outputs.all()[0].value
self.assertEqual(o,[20,40,60,80])
class SimpleTest(TestCase):
def test_basic_addition(self):
"""
Tests that 1 + 1 always equals 2.
"""
self.assertEqual(1 + 1, 2)
def test_fast_workflow_runner_cv(self):
w = Workflow.objects.get(name='Cross test')
wr = WorkflowRunner(w)
wr.run()
wid = Widget.objects.get(id=16)
o = wid.outputs.all()[0].value
self.assertEqual(o,[[[[u'2'], [u'1']], [u'3'], 1],
[[[u'3'], [u'1']], [u'2'], 1],
[[[u'3'], [u'2']], [u'1'], 1]])
class WidgetEngineTest(TestCase):
fixtures = ['test_data2',]
def test_fast_widget_runner(self):
w = Widget.objects.get(id=6)
wr = WidgetRunner(w,workflow_runner=WorkflowRunner(w.workflow,clean=False),standalone=True)
wr.run()
wid = Widget.objects.get(id=6)
o = wid.outputs.all()[0].value
self.assertEqual(o,[20,40,60,80])
def test_fast_widget_runner_cv(self):
w = Widget.objects.get(id=16)
wr = WidgetRunner(w,workflow_runner=WorkflowRunner(w.workflow,clean=False),standalone=True)
wr.run()
wid = Widget.objects.get(id=16)
o = wid.outputs.all()[0].value
self.assertEqual(o,[[[[u'2'], [u'1']], [u'3'], 1],
[[[u'3'], [u'1']], [u'2'], 1],
[[[u'3'], [u'2']], [u'1'], 1]])
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment