Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
Alain Shakour
clowdflows
Commits
c22245e8
Commit
c22245e8
authored
Jun 04, 2013
by
Janez K
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
dodal haltstream exception
parent
5fa82106
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
49 additions
and
41 deletions
+49
-41
mothra/settings.py
mothra/settings.py
+4
-4
streams/management/commands/run_streams.py
streams/management/commands/run_streams.py
+1
-1
streams/models.py
streams/models.py
+35
-32
workflows/streaming/library.py
workflows/streaming/library.py
+9
-4
No files found.
mothra/settings.py
View file @
c22245e8
...
...
@@ -186,10 +186,10 @@ INSTALLED_APPS_CONCUR = (
INSTALLED_APPS_WORKFLOWS_SUB
=
(
'workflows.base'
,
'workflows.latino'
,
#
'workflows.latino',
'workflows.decision_support'
,
'workflows.segmine'
,
'workflows.subgroup_discovery'
,
#
'workflows.subgroup_discovery',
'workflows.nlp'
,
'workflows.nl_toolkit'
,
'workflows.ilp'
,
...
...
@@ -197,8 +197,8 @@ INSTALLED_APPS_WORKFLOWS_SUB = (
'workflows.cforange'
,
'workflows.perfeval'
,
'workflows.mysql'
,
'workflows.lemmagen'
,
'workflows.crossbee'
,
#
'workflows.lemmagen',
#
'workflows.crossbee',
'workflows.streaming'
,
#WORKFLOWS_SUBAPP_PLACEHOLDER
)
...
...
streams/management/commands/run_streams.py
View file @
c22245e8
...
...
@@ -16,4 +16,4 @@ class Command(NoArgsCommand):
stream
.
last_executed
=
now
stream
.
save
()
print
stream
.
execute
()
streams/models.py
View file @
c22245e8
...
...
@@ -5,6 +5,9 @@ import workflows.library
from
picklefield.fields
import
PickledObjectField
class
HaltStream
(
Exception
):
pass
# Create your models here.
class
Stream
(
models
.
Model
):
...
...
@@ -13,13 +16,13 @@ class Stream(models.Model):
last_executed
=
models
.
DateTimeField
(
auto_now_add
=
True
)
period
=
models
.
IntegerField
(
default
=
60
)
active
=
models
.
BooleanField
(
default
=
False
)
def
execute
(
self
,
workflow
=
None
,
outputs
=
{}):
if
workflow
is
None
:
workflow
=
self
.
workflow
ready_to_run
=
[]
widgets
=
workflow
.
widgets
.
all
()
#get unfinished
if
workflow
.
is_for_loop
():
fi
=
workflow
.
widgets
.
filter
(
type
=
'for_input'
)[
0
]
...
...
@@ -34,10 +37,10 @@ class Stream(models.Model):
input_list
=
[]
else
:
input_list
=
[
0
]
#print input_list
for
for_input
in
input_list
:
#print for_input
finished
=
[]
...
...
@@ -48,9 +51,9 @@ class Stream(models.Model):
for
w
in
unfinished_list
:
# prepare all the inputs for this widget
input_dict
=
{}
output_dict
=
{}
output_dict
=
{}
finish
=
True
for
i
in
w
.
inputs
.
all
():
#gremo pogledat ce obstaja povezava in ce obstaja gremo value prebrat iz outputa
if
not
i
.
parameter
:
...
...
@@ -65,25 +68,25 @@ class Stream(models.Model):
if
not
i
.
variable
in
input_dict
:
input_dict
[
i
.
variable
]
=
[]
if
not
i
.
value
==
None
:
input_dict
[
i
.
variable
].
append
(
i
.
value
)
input_dict
[
i
.
variable
].
append
(
i
.
value
)
if
w
.
type
==
'regular'
:
function_to_call
=
getattr
(
workflows
.
library
,
w
.
abstract_widget
.
action
)
if
w
.
abstract_widget
.
wsdl
!=
''
:
input_dict
[
'wsdl'
]
=
w
.
abstract_widget
.
wsdl
input_dict
[
'wsdl_method'
]
=
w
.
abstract_widget
.
wsdl_method
try
:
if
w
.
abstract_widget
.
has_progress_bar
:
output_dict
=
function_to_call
(
input_dict
,
w
)
elif
w
.
abstract_widget
.
is_streaming
:
output_dict
=
function_to_call
(
input_dict
,
w
,
self
)
else
:
output_dict
=
function_to_call
(
input_dict
)
except
:
halted
.
append
(
w
)
finish
=
False
try
:
if
w
.
abstract_widget
.
has_progress_bar
:
output_dict
=
function_to_call
(
input_dict
,
w
)
elif
w
.
abstract_widget
.
is_streaming
:
output_dict
=
function_to_call
(
input_dict
,
w
,
self
)
else
:
output_dict
=
function_to_call
(
input_dict
)
except
HaltStream
:
halted
.
append
(
w
)
finish
=
False
if
w
.
type
==
'subprocess'
:
new_outputs
=
self
.
execute
(
workflow
=
w
.
workflow_link
,
outputs
=
outputs
)
for
o
in
w
.
outputs
.
all
():
...
...
@@ -91,18 +94,18 @@ class Stream(models.Model):
outputs
[
o
.
pk
]
=
new_outputs
[
o
.
pk
]
except
:
outputs
[
o
.
pk
]
=
(
o
.
variable
,
None
)
if
w
.
type
==
'for_input'
:
for
o
in
w
.
outputs
.
all
():
outputs
[
o
.
pk
]
=
(
o
.
variable
,
for_input
)
#print outputs[o.pk]
output_dict
[
o
.
variable
]
=
for_input
if
w
.
type
==
'for_output'
:
for
i
in
w
.
inputs
.
all
():
outputs
[
i
.
outer_output
.
pk
][
1
].
append
(
input_dict
[
i
.
variable
])
output_dict
[
i
.
variable
]
=
input_dict
[
i
.
variable
]
if
w
.
type
==
'input'
:
for
o
in
w
.
outputs
.
all
():
value
=
None
...
...
@@ -112,17 +115,17 @@ class Stream(models.Model):
except
:
value
=
None
output_dict
[
o
.
variable
]
=
value
if
finish
:
if
w
.
type
==
'output'
:
for
i
in
w
.
inputs
.
all
():
outputs
[
i
.
outer_output
.
pk
]
=
(
i
.
outer_output
.
variable
,
input_dict
[
i
.
variable
])
if
w
.
type
!=
'subprocess'
:
for
o
in
w
.
outputs
.
all
():
outputs
[
o
.
pk
]
=
(
o
.
variable
,
output_dict
[
o
.
variable
])
finished
.
append
(
w
.
pk
)
unfinished_list
=
[]
for
w
in
widgets
:
...
...
@@ -139,14 +142,14 @@ class Stream(models.Model):
if
len
(
unfinished_list
)
==
0
:
loop
=
False
return
outputs
def
__unicode__
(
self
):
return
unicode
(
self
.
workflow
)
+
' stream'
class
StreamWidgetData
(
models
.
Model
):
stream
=
models
.
ForeignKey
(
Stream
,
related_name
=
"widget_data"
)
widget
=
models
.
ForeignKey
(
Widget
,
related_name
=
"stream_data"
)
value
=
PickledObjectField
(
null
=
True
)
\ No newline at end of file
workflows/streaming/library.py
View file @
c22245e8
...
...
@@ -4,6 +4,10 @@ Streaming widgets librarby
@author: Janez Kranjc <janez.kranjc@ijs.si>
'''
def
streaming_twitter
(
input_dict
,
widget
,
stream
=
None
):
output_dict
=
{}
return
output_dict
def
streaming_rss_reader
(
input_dict
,
widget
,
stream
=
None
):
import
feedparser
from
streams.models
import
StreamWidgetData
...
...
@@ -25,12 +29,13 @@ def streaming_rss_reader(input_dict,widget,stream=None):
feed_length
=
len
(
feed
[
'items'
])
feed
[
'items'
].
reverse
()
for
item
in
feed
[
'items'
]:
if
item
[
'
id
'
]
not
in
data
:
data
.
append
(
item
[
'
id
'
])
if
item
[
'
link
'
]
not
in
data
:
data
.
append
(
item
[
'
link
'
])
swd
.
value
=
data
swd
.
save
()
output_dict
[
'url'
]
=
item
[
'link'
]
break
else
:
raise
Exception
(
"Halting stream."
)
return
output_dict
\ No newline at end of file
from
streams.models
import
HaltStream
raise
HaltStream
(
"Halting stream."
)
return
output_dict
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment