Commit 16edca33 authored by Janez K's avatar Janez K
Browse files

faster subprocess and cross validation execution

parent a0824f74
[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover
# Don't complain about missing debug-only code:
def __repr__
if self\.debug
if settings\.DEBUG
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
\ No newline at end of file
......@@ -28,3 +28,5 @@ build/
/mothra/mothra.db-journal
celerybeat-schedule
.coverage
htmlcov
\ No newline at end of file
coverage run --source='.' manage.py test
coverage html --omit=workflows/segmine/data/mappings.py
import workflows.library
import time
import random
class WidgetRunner():
def __init__(self,widget,workflow_runner):
def __init__(self,widget,workflow_runner,standalone=False):
self.widget = widget
self.inputs = widget.inputs.all()
self.output = widget.outputs.all()
self.input_data = {}
self.output_data = {}
self.workflow_runner = workflow_runner
self.inner_workflow_runner = None
if self.widget.abstract_widget is None:
self.inner_workflow_runner = WorkflowRunner(self.widget.workflow_link,clean=True,parent=self.workflow_runner)
self.standalone = standalone
if self.standalone:
for w in self.workflow_runner.widgets:
if w.id == self.widget.id:
self.widget = w
break
else:
raise Exception("this shouldn't happen!")
def run(self):
self.widget.running = True
""" subprocesses and regular widgets get treated here """
if self.widget.type == 'regular' or self.widget.type == 'subprocess':
if not self.widget.abstract_widget is None:
if self.widget.abstract_widget:
function_to_call = getattr(workflows.library,self.widget.abstract_widget.action)
input_dict = self.get_input_dictionary()
start = time.time()
try:
if self.widget.abstract_widget:
if self.widget.abstract_widget.wsdl != '':
input_dict['wsdl']=self.abstract_widget.wsdl
input_dict['wsdl_method']=self.abstract_widget.wsdl_method
if self.widget.abstract_widget.has_progress_bar:
outputs = function_to_call(input_dict,self.widget)
elif self.widget.abstract_widget.is_streaming:
outputs = function_to_call(input_dict,self.widget,None)
else:
outputs = function_to_call(input_dict)
input_dict = self.get_input_dictionary()
outputs = {}
start = time.time()
try:
if self.widget.abstract_widget:
if self.widget.abstract_widget.wsdl != '':
input_dict['wsdl']=self.abstract_widget.wsdl
input_dict['wsdl_method']=self.abstract_widget.wsdl_method
if self.widget.abstract_widget.has_progress_bar:
outputs = function_to_call(input_dict,self.widget)
elif self.widget.abstract_widget.is_streaming:
outputs = function_to_call(input_dict,self.widget,None)
else:
"""subworkflow"""
print "Subworkflow"
except:
self.widget.error=True
self.widget.running=False
self.widget.finished=False
raise
elapsed = (time.time()-start)
outputs['clowdflows_elapsed']=elapsed
self.assign_outputs(outputs)
outputs = function_to_call(input_dict)
else:
""" we run the subprocess """
self.inner_workflow_runner = WorkflowRunner(self.widget.workflow_link,parent=self.workflow_runner)
self.inner_workflow_runner.run()
except:
self.widget.error=True
self.widget.running=False
self.widget.finished=False
raise
elapsed = (time.time()-start)
outputs['clowdflows_elapsed']=elapsed
self.assign_outputs(outputs)
elif self.widget.type == 'input':
for o in self.widget.outputs.all():
o.value = self.workflow_runner.parent.inputs[o.outer_input_id].value
elif self.widget.type == 'output':
input_dict = self.get_input_dictionary()
for i in self.widget.inputs.all():
self.workflow_runner.parent.outputs[i.outer_output_id].value = i.value
elif self.widget.type == 'for_output':
input_dict = self.get_input_dictionary()
for i in self.widget.inputs.all():
self.workflow_runner.parent.outputs[i.outer_output_id].value.append(i.value)
elif self.widget.type == 'cv_output':
input_dict = self.get_input_dictionary()
for i in self.widget.inputs.all():
self.workflow_runner.parent.outputs[i.outer_output_id].value.append(i.value)
self.widget.running = False
self.widget.error = False
self.widget.finished = True
if self.standalone:
self.save()
def assign_outputs(self,outputs):
for o in self.widget.outputs.all():
if self.widget.abstract_widget:
try:
o.value = outputs[o.variable]
c = self.workflow_runner.get_connection_for_output(o)
if c:
c.output.value = outputs[o.variable]
except:
pass
else:
"""if not self.workflow_link.is_for_loop():
o.value = o.inner_input_id
"""
print "not implemented"
try:
o.value = outputs[o.variable]
except:
pass
def get_input_dictionary(self):
input_dictionary = {}
......@@ -71,7 +86,7 @@ class WidgetRunner():
if not i.parameter:
connection = self.workflow_runner.get_connection_for_input(i)
if connection:
i.value = connection.output.value
i.value = self.workflow_runner.outputs[connection.output_id].value
else:
i.value = None
""" here we assign the value to the dictionary """
......@@ -84,12 +99,28 @@ class WidgetRunner():
input_dictionary[i.variable].append(i.value)
return input_dictionary
def save(self):
for i in self.widget.inputs.all():
i.save()
for o in self.widget.outputs.all():
print o.value
o.save()
self.widget.save()
class WorkflowRunner():
def __init__(self,workflow,clean=True,parent=None):
self.workflow = workflow
self.connections = workflow.connections.all().select_related('input','output')
self.connections = workflow.connections.all()
self.widgets = workflow.widgets.all().select_related('abstract_widget').prefetch_related('inputs','outputs')
self.inputs = {}
self.outputs = {}
for w in self.widgets:
for i in w.inputs.all():
self.inputs[i.id] = i
for o in w.outputs.all():
self.outputs[o.id] = o
self.clean = clean
self.parent = parent
def is_for_loop(self):
for w in self.widgets:
......@@ -147,14 +178,13 @@ class WorkflowRunner():
for w in self.unfinished_widgets:
ready_to_run = True
for c in self.connections:
if c.input.widget_id == w.id and not c.output.widget_id in finished_widget_ids:
if self.inputs[c.input_id].widget_id == w.id and not self.outputs[c.output_id].widget_id in finished_widget_ids:
ready_to_run = False
if ready_to_run:
runnable.append(w)
return runnable
def run(self):
self.cleanup()
def run_all_unfinished_widgets(self):
runnable_widgets = self.runnable_widgets
while len(runnable_widgets)>0:
for w in runnable_widgets:
......@@ -162,10 +192,107 @@ class WorkflowRunner():
wr.run()
runnable_widgets = self.runnable_widgets
def run(self):
self.cleanup()
if self.is_for_loop():
fi = None
fo = None
for w in self.widgets:
if w.type=='for_input':
fi = w
if w.type=='for_output':
fo = w
outer_output = self.parent.outputs[fo.inputs.all()[0].outer_output_id]
outer_output.value = []
input_list = self.parent.inputs[fi.outputs.all()[0].outer_input_id].value
for i in input_list:
self.cleanup()
proper_output = fi.outputs.all()[0]
proper_output.value = i
fi.finished = True
self.run_all_unfinished_widgets()
elif self.is_cross_validation():
import random as rand
fi = None
fo = None
for w in self.widgets:
if w.type=='cv_input':
fi = w
if w.type=='cv_output':
fo = w
outer_output = self.parent.outputs[fo.inputs.all()[0].outer_output_id]
outer_output.value = []
input_list = self.parent.inputs[fi.outputs.all()[0].outer_input_id].value
input_fold = self.parent.inputs[fi.outputs.all()[1].outer_input_id].value
input_seed = self.parent.inputs[fi.outputs.all()[2].outer_input_id].value
if input_fold != None:
input_fold = int(input_fold)
else:
input_fold = 10
if input_seed != None:
input_seed = int(input_seed)
else:
input_seed = random.randint(0,10**9)
input_type = input_list.__class__.__name__
context = None
if input_type == 'DBContext':
context = input_list
input_list = context.orng_tables.get(context.target_table,None)
if not input_list:
raise Exception('CrossValidation: Empty input list!')
folds = []
if hasattr(input_list, "get_items_ref"):
import orange
indices = orange.MakeRandomIndicesCV(input_list, randseed=input_seed, folds=input_fold, stratified=orange.MakeRandomIndices.Stratified)
for i in range(input_fold):
output_train = input_list.select(indices, i, negate=1)
output_test = input_list.select(indices, i)
output_train.name = input_list.name
output_test.name = input_list.name
folds.append((output_train, output_test))
else:
rand.seed(input_seed)
rand.shuffle(input_list)
folds = [input_list[i::input_fold] for i in range(input_fold)]
proper_output = fi.outputs.all()[2]
proper_output.value = input_seed
for i in range(len(folds)):
#import pdb; pdb.set_trace()
if hasattr(input_list, "get_items_ref"):
output_test = folds[i][1]
output_train = folds[i][0]
else:
output_train = folds[:i] + folds[i+1:]
output_test = folds[i]
if input_type == 'DBContext':
output_train_obj = context.copy()
output_train_obj.orng_tables[context.target_table] = output_train
output_test_obj = context.copy()
output_test_obj.orng_tables[context.target_table] = output_test
output_train = output_train_obj
output_test = output_test_obj
self.cleanup()
proper_output = fi.outputs.all()[0] # inner output
proper_output.value = output_train
proper_output = fi.outputs.all()[1] # inner output
proper_output.value = output_test
fi.finished=True # set the input widget as finished
self.run_all_unfinished_widgets()
else:
self.run_all_unfinished_widgets()
self.save()
def save(self):
for w in self.widgets:
w.save()
for i in w.inputs.all():
i.save()
i.save(force_update=True)
for o in w.outputs.all():
o.save()
\ No newline at end of file
o.save(force_update=True)
w.save(force_update=True)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
......@@ -21,4 +21,4 @@ class HarfLearner(orngRFCons.RandomForestLearner):
self.learner
class HarfClassifier(orngRFCons.RandomForestClassifier):
#class HarfClassifier(orngRFCons.RandomForestClassifier):
......@@ -19,6 +19,8 @@ if USE_CONCURRENCY:
from workflows.tasks import executeWidgetFunction, executeWidgetProgressBar, executeWidgetStreaming, executeWidgetWithRequest, runWidget, executeWidgetPostInteract
from workflows.engine import WidgetRunner, WorkflowRunner
class WidgetException(Exception):
pass
......@@ -794,16 +796,9 @@ class Widget(models.Model):
""" else run abstract widget function """
outputs = function_to_call(input_dict)
else:
if self.workflow_link.is_for_loop():
""" if this is object is a for loop than true and run;
else false and run workflow """
#print("proper_run_is_for_loop")
self.workflow_link.run_for_loop()
#print self.outputs.all()[0].value
elif self.workflow_link.is_cross_validation():
self.workflow_link.run_cross_validation()
else:
self.workflow_link.run()
wr = WidgetRunner(self,workflow_runner=WorkflowRunner(self.workflow,clean=False),standalone=True)
wr.run()
return
except:
self.error=True
self.running=False
......@@ -885,16 +880,23 @@ class Widget(models.Model):
elif self.type == 'cv_output':
""" if object is an output widget for cross validation,
then read output values and configure parameters"""
print "smo v cv_output"
for i in self.inputs.all():
if not i.parameter:
""" if there is a connection than true and read the output value """
if i.connections.count() > 0:
if i.value is None:
print "1"
i.value = [i.connections.all()[0].output.value]
else:
print "2"
i.value = [i.connections.all()[0].output.value] + i.value
print i.value
#print i.value
i.save()
print "----"
print i.outer_output.value
print "----"
i.outer_output.value.append(i.value)
i.outer_output.save()
self.finished=True
......
......@@ -6,11 +6,46 @@ Replace this with more appropriate tests for your application.
"""
from django.test import TestCase
from workflows.engine import WorkflowRunner, WidgetRunner
from workflows.models import Workflow, Widget
import time
class WorkflowEngineTest(TestCase):
fixtures = ['test_data',]
def test_fast_workflow_runner(self):
w = Workflow.objects.get(name='For loop test')
wr = WorkflowRunner(w)
wr.run()
wid = Widget.objects.get(id=6)
o = wid.outputs.all()[0].value
self.assertEqual(o,[20,40,60,80])
class SimpleTest(TestCase):
def test_basic_addition(self):
"""
Tests that 1 + 1 always equals 2.
"""
self.assertEqual(1 + 1, 2)
def test_fast_workflow_runner_cv(self):
w = Workflow.objects.get(name='Cross test')
wr = WorkflowRunner(w)
wr.run()
wid = Widget.objects.get(id=16)
o = wid.outputs.all()[0].value
self.assertEqual(o,[[[[u'2'], [u'1']], [u'3'], 1],
[[[u'3'], [u'1']], [u'2'], 1],
[[[u'3'], [u'2']], [u'1'], 1]])
class WidgetEngineTest(TestCase):
fixtures = ['test_data2',]
def test_fast_widget_runner(self):
w = Widget.objects.get(id=6)
wr = WidgetRunner(w,workflow_runner=WorkflowRunner(w.workflow,clean=False),standalone=True)
wr.run()
wid = Widget.objects.get(id=6)
o = wid.outputs.all()[0].value
self.assertEqual(o,[20,40,60,80])
def test_fast_widget_runner_cv(self):
w = Widget.objects.get(id=16)
wr = WidgetRunner(w,workflow_runner=WorkflowRunner(w.workflow,clean=False),standalone=True)
wr.run()
wid = Widget.objects.get(id=16)
o = wid.outputs.all()[0].value
self.assertEqual(o,[[[[u'2'], [u'1']], [u'3'], 1],
[[[u'3'], [u'1']], [u'2'], 1],
[[[u'3'], [u'2']], [u'1'], 1]])
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment