engine.py 11.8 KB
Newer Older
1
2
import workflows.library
import time
3
import random
4

Janez K's avatar
Janez K committed
5
class WidgetRunner():
6
    def __init__(self,widget,workflow_runner,standalone=False):
Janez K's avatar
Janez K committed
7
8
9
10
        self.widget = widget
        self.inputs = widget.inputs.all()
        self.output = widget.outputs.all()
        self.workflow_runner = workflow_runner
11
        self.inner_workflow_runner = None
12
13
14
15
16
17
18
19
        self.standalone = standalone
        if self.standalone:
            for w in self.workflow_runner.widgets:
                if w.id == self.widget.id:
                    self.widget = w
                    break
            else:
                raise Exception("this shouldn't happen!")
Janez K's avatar
Janez K committed
20
    def run(self):
21
22
23
        self.widget.running = True
        """ subprocesses and regular widgets get treated here """
        if self.widget.type == 'regular' or self.widget.type == 'subprocess':
24
            if self.widget.abstract_widget:
25
                function_to_call = getattr(workflows.library,self.widget.abstract_widget.action)
26
27
28
29
30
31
32
33
34
35
36
37
            input_dict = self.get_input_dictionary()
            outputs = {}
            start = time.time()
            try:
                if self.widget.abstract_widget:
                    if self.widget.abstract_widget.wsdl != '':
                        input_dict['wsdl']=self.abstract_widget.wsdl
                        input_dict['wsdl_method']=self.abstract_widget.wsdl_method
                    if self.widget.abstract_widget.has_progress_bar:
                        outputs = function_to_call(input_dict,self.widget)
                    elif self.widget.abstract_widget.is_streaming:
                        outputs = function_to_call(input_dict,self.widget,None)
38
                    else:
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
                        outputs = function_to_call(input_dict)
                else:
                    """ we run the subprocess """
                    self.inner_workflow_runner = WorkflowRunner(self.widget.workflow_link,parent=self.workflow_runner)
                    self.inner_workflow_runner.run()
            except:
                self.widget.error=True
                self.widget.running=False
                self.widget.finished=False
                raise
            elapsed = (time.time()-start)
            outputs['clowdflows_elapsed']=elapsed
            self.assign_outputs(outputs)
        elif self.widget.type == 'input':
            for o in self.widget.outputs.all():
                o.value = self.workflow_runner.parent.inputs[o.outer_input_id].value
        elif self.widget.type == 'output':
            input_dict = self.get_input_dictionary()
            for i in self.widget.inputs.all():
                self.workflow_runner.parent.outputs[i.outer_output_id].value = i.value
        elif self.widget.type == 'for_output':
            input_dict = self.get_input_dictionary()
            for i in self.widget.inputs.all():
                self.workflow_runner.parent.outputs[i.outer_output_id].value.append(i.value)
        elif self.widget.type == 'cv_output':
            input_dict = self.get_input_dictionary()
            for i in self.widget.inputs.all():
                self.workflow_runner.parent.outputs[i.outer_output_id].value.append(i.value)

68
        self.widget.running = False
69
        self.widget.error = False
70
        self.widget.finished = True
71
72
        if self.standalone:
            self.save()
73
74
75

    def assign_outputs(self,outputs):
        for o in self.widget.outputs.all():
76
77
78
79
            try:
                o.value = outputs[o.variable]
            except:
                pass
80
81
82
83
84
85
86
87
88

    def get_input_dictionary(self):
        input_dictionary = {}
        for i in self.widget.inputs.all():
            """ if this isn't a parameter we need to fetch it
                from the output. """
            if not i.parameter:
                connection = self.workflow_runner.get_connection_for_input(i)
                if connection:
89
                    i.value = self.workflow_runner.outputs[connection.output_id].value
90
91
92
93
94
95
96
97
98
99
100
                else:
                    i.value = None
            """ here we assign the value to the dictionary """
            if i.multi_id==0:
                input_dictionary[i.variable]=i.value
            else: # it's a multiple input
                if not i.variable in input_dictionary:
                    input_dictionary[i.variable]=[]
                if not i.value==None:
                    input_dictionary[i.variable].append(i.value)
        return input_dictionary
Janez K's avatar
Janez K committed
101

102
103
104
105
106
107
108
    def save(self):
        for i in self.widget.inputs.all():
            i.save()
        for o in self.widget.outputs.all():
            o.save()
        self.widget.save()

Janez K's avatar
Janez K committed
109
class WorkflowRunner():
110
    def __init__(self,workflow,clean=True,parent=None):
Janez K's avatar
Janez K committed
111
        self.workflow = workflow
112
        self.connections = workflow.connections.all()
113
        self.widgets = workflow.widgets.all().select_related('abstract_widget').prefetch_related('inputs','outputs')
114
115
116
117
118
119
120
        self.inputs = {}
        self.outputs = {}
        for w in self.widgets:
            for i in w.inputs.all():
                self.inputs[i.id] = i
            for o in w.outputs.all():
                self.outputs[o.id] = o
Janez K's avatar
Janez K committed
121
        self.clean = clean
122
        self.parent = parent
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

    def is_for_loop(self):
        for w in self.widgets:
            if w.type=='for_input':
                return True
        return False

    def is_cross_validation(self):
        for w in self.widgets:
            if w.type=='cv_input':
                return True
        return False

    def cleanup(self):
        for w in self.widgets:
            if self.clean:
Janez K's avatar
Janez K committed
139
                w.finished = False
140
141
142
143
144
145
146
147
148
149
150
151
152
            w.error = False        

    def get_connection_for_output(self,output):
        for c in self.connections:
            if c.output_id==output.id:
                return c
        return None

    def get_connection_for_input(self,input):
        for c in self.connections:
            if c.input_id==input.id:
                return c
        return None
Janez K's avatar
Janez K committed
153
154
155
156
157
158
159
160
161
162
163
164
165

    @property
    def finished_widgets(self):
        finished_widgets = []
        for w in self.widgets:
            if w.finished:
                finished_widgets.append(w)
        return finished_widgets

    @property
    def unfinished_widgets(self):
        unfinished_widgets = []
        for w in self.widgets:
166
            if not w.finished and not w.running and not w.error:
Janez K's avatar
Janez K committed
167
168
169
170
171
172
173
174
175
176
177
178
179
                unfinished_widgets.append(w)
        return unfinished_widgets

    @property
    def runnable_widgets(self):
        """ a widget is runnable if all widgets connected before
            it are finished (i.e. widgets that have outputs that 
            are connected to this widget's input) """
        finished_widget_ids = [w.id for w in self.finished_widgets]
        runnable = []
        for w in self.unfinished_widgets:
            ready_to_run = True
            for c in self.connections:
180
                if self.inputs[c.input_id].widget_id == w.id and not self.outputs[c.output_id].widget_id in finished_widget_ids:
Janez K's avatar
Janez K committed
181
182
183
184
185
                    ready_to_run = False
            if ready_to_run:
                runnable.append(w)
        return runnable

186
    def run_all_unfinished_widgets(self):
Janez K's avatar
Janez K committed
187
188
189
190
191
        runnable_widgets = self.runnable_widgets
        while len(runnable_widgets)>0:
            for w in runnable_widgets:
                wr = WidgetRunner(w,self)
                wr.run()
192
193
            runnable_widgets = self.runnable_widgets

194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
    def run(self):
        self.cleanup()
        if self.is_for_loop():
            fi = None
            fo = None
            for w in self.widgets:
                if w.type=='for_input':
                    fi = w
                if w.type=='for_output':
                    fo = w
            outer_output = self.parent.outputs[fo.inputs.all()[0].outer_output_id]
            outer_output.value = []
            input_list = self.parent.inputs[fi.outputs.all()[0].outer_input_id].value
            for i in input_list:
                self.cleanup()
                proper_output = fi.outputs.all()[0]
                proper_output.value = i
                fi.finished = True
                self.run_all_unfinished_widgets()
        elif self.is_cross_validation():
            import random as rand
            fi = None
            fo = None
            for w in self.widgets:
                if w.type=='cv_input':
                    fi = w
                if w.type=='cv_output':
                    fo = w
            outer_output = self.parent.outputs[fo.inputs.all()[0].outer_output_id]
            outer_output.value = []
            input_list = self.parent.inputs[fi.outputs.all()[0].outer_input_id].value
            input_fold = self.parent.inputs[fi.outputs.all()[1].outer_input_id].value
            input_seed = self.parent.inputs[fi.outputs.all()[2].outer_input_id].value
            if input_fold != None:
                input_fold = int(input_fold)
            else:
                input_fold = 10

            if input_seed != None:
                input_seed = int(input_seed)
            else:
                input_seed = random.randint(0,10**9)

            input_type = input_list.__class__.__name__
            context = None
            if input_type == 'DBContext':
                context = input_list
                input_list = context.orng_tables.get(context.target_table,None)

            if not input_list:
                raise Exception('CrossValidation: Empty input list!')

            folds = []
            if hasattr(input_list, "get_items_ref"):
                import orange
                indices = orange.MakeRandomIndicesCV(input_list, randseed=input_seed, folds=input_fold, stratified=orange.MakeRandomIndices.Stratified)
                for i in range(input_fold):
                    output_train = input_list.select(indices, i, negate=1)
                    output_test = input_list.select(indices, i)
                    output_train.name = input_list.name
                    output_test.name = input_list.name
                    folds.append((output_train, output_test))
            else:
                rand.seed(input_seed)
                rand.shuffle(input_list)
                folds = [input_list[i::input_fold] for i in range(input_fold)]

            proper_output = fi.outputs.all()[2]
            proper_output.value = input_seed

            for i in range(len(folds)):
                #import pdb; pdb.set_trace()
                if hasattr(input_list, "get_items_ref"):
                    output_test = folds[i][1]
                    output_train = folds[i][0]
                else:
                    output_train = folds[:i] + folds[i+1:]
                    output_test = folds[i]
                if input_type == 'DBContext':
                    output_train_obj = context.copy()
                    output_train_obj.orng_tables[context.target_table] = output_train
                    output_test_obj = context.copy()
                    output_test_obj.orng_tables[context.target_table] = output_test
                    output_train = output_train_obj
                    output_test = output_test_obj

                self.cleanup()
                proper_output = fi.outputs.all()[0] # inner output
                proper_output.value = output_train
                proper_output = fi.outputs.all()[1] # inner output
                proper_output.value = output_test
                fi.finished=True # set the input widget as finished
                self.run_all_unfinished_widgets()
        else:
            self.run_all_unfinished_widgets()
        self.save()

291
292
293
    def save(self):
        for w in self.widgets:
            for i in w.inputs.all():
294
                i.save(force_update=True)
295
            for o in w.outputs.all():
296
297
                o.save(force_update=True)
            w.save(force_update=True)