diff --git a/mothra/settings.py b/mothra/settings.py index 5928e517ad77cf97b483c04afa1037fa7beaee27..c2196eabb3ad993c0ad18a7a17d569bb79190c8e 100755 --- a/mothra/settings.py +++ b/mothra/settings.py @@ -186,10 +186,10 @@ INSTALLED_APPS_CONCUR = ( INSTALLED_APPS_WORKFLOWS_SUB = ( 'workflows.base', - 'workflows.latino', + #'workflows.latino', 'workflows.decision_support', 'workflows.segmine', - 'workflows.subgroup_discovery', + #'workflows.subgroup_discovery', 'workflows.nlp', 'workflows.nl_toolkit', 'workflows.ilp', @@ -197,8 +197,8 @@ INSTALLED_APPS_WORKFLOWS_SUB = ( 'workflows.cforange', 'workflows.perfeval', 'workflows.mysql', - 'workflows.lemmagen', - 'workflows.crossbee', + #'workflows.lemmagen', + #'workflows.crossbee', 'workflows.streaming', #WORKFLOWS_SUBAPP_PLACEHOLDER ) diff --git a/streams/management/commands/run_streams.py b/streams/management/commands/run_streams.py index 778ebe4ca82a62581ad95a8b294da84c75989a48..b495cdb4f612d7632f2513d452adc035a7353b79 100755 --- a/streams/management/commands/run_streams.py +++ b/streams/management/commands/run_streams.py @@ -16,4 +16,4 @@ class Command(NoArgsCommand): stream.last_executed = now stream.save() print stream.execute() - + diff --git a/streams/models.py b/streams/models.py index a8e4e11e3451c257eaf0e97696e840e06f5ca268..4a28fc01a9e8881e9306fd3171f6a25d6da31c14 100644 --- a/streams/models.py +++ b/streams/models.py @@ -5,6 +5,9 @@ import workflows.library from picklefield.fields import PickledObjectField +class HaltStream(Exception): + pass + # Create your models here. class Stream(models.Model): @@ -13,13 +16,13 @@ class Stream(models.Model): last_executed = models.DateTimeField(auto_now_add=True) period = models.IntegerField(default=60) active = models.BooleanField(default=False) - + def execute(self,workflow=None,outputs={}): if workflow is None: workflow = self.workflow ready_to_run = [] widgets = workflow.widgets.all() - + #get unfinished if workflow.is_for_loop(): fi = workflow.widgets.filter(type='for_input')[0] @@ -34,10 +37,10 @@ class Stream(models.Model): input_list = [] else: input_list = [0] - - + + #print input_list - + for for_input in input_list: #print for_input finished = [] @@ -48,9 +51,9 @@ class Stream(models.Model): for w in unfinished_list: # prepare all the inputs for this widget input_dict = {} - output_dict = {} + output_dict = {} finish = True - + for i in w.inputs.all(): #gremo pogledat ce obstaja povezava in ce obstaja gremo value prebrat iz outputa if not i.parameter: @@ -65,25 +68,25 @@ class Stream(models.Model): if not i.variable in input_dict: input_dict[i.variable]=[] if not i.value==None: - input_dict[i.variable].append(i.value) - + input_dict[i.variable].append(i.value) + if w.type == 'regular': function_to_call = getattr(workflows.library,w.abstract_widget.action) if w.abstract_widget.wsdl != '': input_dict['wsdl']=w.abstract_widget.wsdl input_dict['wsdl_method']=w.abstract_widget.wsdl_method - try: - if w.abstract_widget.has_progress_bar: - output_dict = function_to_call(input_dict,w) - elif w.abstract_widget.is_streaming: - output_dict = function_to_call(input_dict,w,self) - else: - output_dict = function_to_call(input_dict) - except: - halted.append(w) - finish=False + try: + if w.abstract_widget.has_progress_bar: + output_dict = function_to_call(input_dict,w) + elif w.abstract_widget.is_streaming: + output_dict = function_to_call(input_dict,w,self) + else: + output_dict = function_to_call(input_dict) + except HaltStream: + halted.append(w) + finish=False + - if w.type == 'subprocess': new_outputs = self.execute(workflow=w.workflow_link,outputs=outputs) for o in w.outputs.all(): @@ -91,18 +94,18 @@ class Stream(models.Model): outputs[o.pk]=new_outputs[o.pk] except: outputs[o.pk]=(o.variable,None) - + if w.type == 'for_input': for o in w.outputs.all(): outputs[o.pk]=(o.variable,for_input) #print outputs[o.pk] output_dict[o.variable]=for_input - + if w.type == 'for_output': for i in w.inputs.all(): outputs[i.outer_output.pk][1].append(input_dict[i.variable]) output_dict[i.variable]=input_dict[i.variable] - + if w.type == 'input': for o in w.outputs.all(): value = None @@ -112,17 +115,17 @@ class Stream(models.Model): except: value = None output_dict[o.variable]=value - + if finish: if w.type == 'output': for i in w.inputs.all(): outputs[i.outer_output.pk]=(i.outer_output.variable,input_dict[i.variable]) - - + + if w.type != 'subprocess': for o in w.outputs.all(): outputs[o.pk]=(o.variable,output_dict[o.variable]) - + finished.append(w.pk) unfinished_list = [] for w in widgets: @@ -139,14 +142,14 @@ class Stream(models.Model): if len(unfinished_list)==0: loop = False return outputs - - + + def __unicode__(self): return unicode(self.workflow)+' stream' - + class StreamWidgetData(models.Model): stream = models.ForeignKey(Stream, related_name="widget_data") widget = models.ForeignKey(Widget, related_name="stream_data") value = PickledObjectField(null=True) - - \ No newline at end of file + + diff --git a/workflows/streaming/library.py b/workflows/streaming/library.py index 51a7c995f02deef797b99d6ea5238fc3a38348a7..6d582bb6b0bf7f42ab6069af1dd6c66f222e272e 100644 --- a/workflows/streaming/library.py +++ b/workflows/streaming/library.py @@ -4,6 +4,10 @@ Streaming widgets librarby @author: Janez Kranjc ''' +def streaming_twitter(input_dict,widget,stream=None): + output_dict = {} + return output_dict + def streaming_rss_reader(input_dict,widget,stream=None): import feedparser from streams.models import StreamWidgetData @@ -25,12 +29,13 @@ def streaming_rss_reader(input_dict,widget,stream=None): feed_length = len(feed['items']) feed['items'].reverse() for item in feed['items']: - if item['id'] not in data: - data.append(item['id']) + if item['link'] not in data: + data.append(item['link']) swd.value = data swd.save() output_dict['url'] = item['link'] break else: - raise Exception("Halting stream.") - return output_dict \ No newline at end of file + from streams.models import HaltStream + raise HaltStream("Halting stream.") + return output_dict