Workflow Engine in Python

- - posted in Architecture - tagged by python, spiffworkflow, workflow | Comments

1,What’s workflow

A workflow consists of a sequence of connected steps where each step follows without delay or gap and ends just before the subsequent step may begin.

it’s just a complicated finite statemachine

2,Spiff workflow

Spiff Workflow is a library implementing a framework for workflows. It is based on http://www.workflowpatterns.com and implemented in pure Python.

In addition, Spiff Workflow provides a parser and workflow emulation layer that can be used to create executable Spiff Workflow specifications from Business Process Model and Notation (BPMN) documents.

For documentation please refer to:

https://github.com/knipknap/SpiffWorkflow/wiki

3,How to make spiff workflow work

There are many test cases in test directory, you would know how it works, at least know most parts.

Generally, it can work like this:

  • a, template ,which define the workflow based on workflow pattern
  • b, tools which can transform the template to real object in spiff object
  • c, engine, which is used to make spiff workflow object work like flow,lol

However,there is some trap in it. you can make it work by function,complete_all() or complete_next() but you can not know how the flow works by getting ready task, the right way is by callback function

ps: I was stuck by this for two or three days.

4, one better example using spiff workflow

the case code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8

import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__),  '../../lib'))

from SpiffWorkflow.specs import *
from SpiffWorkflow import Task, Workflow
from SpiffWorkflow.storage import XmlSerializer


def on_entered_cb(workflow, task, taken_path):
    #print "entered:",task.get_name()
    return True

def on_ready_cb(workflow, task, taken_path):
    #print "ready:",task.get_name()
    return True


def on_reached_cb(workflow, task, taken_path):
    #print "reached:",task.get_name()
    return True

def on_complete_cb(workflow, task, taken_path):
    # Record the path.
    print "complete:",task.get_name()
    #print task.get_description()
    indent = '  ' * (task._get_depth() - 1)
    taken_path.append('%s%s' % (indent, task.get_name()))
    return True



class QuestionError(Exception):

    def __init__(self, value):
        self.value = value

    def __str__(self):
        return repr(self.value)




class QuestionWorkflow(object):
    def __init__(self):
        self.serializer = XmlSerializer()

    def set_up(self,filename):
        # Test patterns that are defined in XML format.
        xml = open(filename).read()
        self.wf_spec = WorkflowSpec.deserialize(XmlSerializer(), xml, filename = filename)
        self.taken_path = self.track_workflow(self.wf_spec)
        self.workflow   = Workflow(self.wf_spec)


    def run(self, UserSelection, restart=False):

        if restart:
            self.workflow = Workflow(self.wf_spec)

        workflow = self.workflow
        condition_keys = []
        if UserSelection is None:
            UserSelection = {}

        task_data_dict = UserSelection.copy()

        while not workflow.is_completed():
            tasks = workflow.get_tasks(Task.READY)

            for t in tasks:
                print "Ready:", t.task_spec.name
                if  hasattr(t.task_spec, "cond_task_specs"):
                    for cond, name in t.task_spec.cond_task_specs:
                        for cond_unit in cond.args:
                            if hasattr(cond_unit, "name"):
                                condition_keys.append(cond_unit.name)


            flag_keys_in_user_select = True
            for cond_key in condition_keys:
                if not task_data_dict.has_key(cond_key):
                    print cond_key
                    flag_keys_in_user_select = False
                    break

            if not flag_keys_in_user_select:
                # some tast's condition's key not in input userselect dict
                return

            for t in tasks:
                t.set_data(**task_data_dict)

            workflow.complete_next()

        if not workflow.is_completed():
            raise QuestionError('invalid feature[%s]' % filename)

    def print_trace(self):
        path = '\n'.join(self.taken_path) + '\n'
        info = ""
        info += 'the workflowrun path:\n'
        info += '%s\n' % path
        print info


    def track_task(self, task_spec, taken_path):

        #reached event call back
        if task_spec.reached_event.is_connected(on_reached_cb):
            task_spec.reached_event.disconnect(on_reached_cb)
        task_spec.reached_event.connect(on_reached_cb, taken_path)

        #completed event call back
        if task_spec.completed_event.is_connected(on_complete_cb):
            task_spec.completed_event.disconnect(on_complete_cb)
        task_spec.completed_event.connect(on_complete_cb, taken_path)

        #enter event call back
        if task_spec.entered_event.is_connected(on_entered_cb):
            task_spec.entered_event.disconnect(on_entered_cb)
        task_spec.entered_event.connect(on_entered_cb, taken_path)

        #ready event call back
        if task_spec.ready_event.is_connected(on_ready_cb):
            task_spec.ready_event.disconnect(on_ready_cb)
        task_spec.ready_event.connect(on_ready_cb, taken_path)



    def track_workflow(self, wf_spec, taken_path = None):
        if taken_path is None:
            taken_path = []
        for name in wf_spec.task_specs:
            #print "track_workflow:",name
            self.track_task(wf_spec.task_specs[name], taken_path)
        return taken_path

if __name__ == '__main__':
    qw = QuestionWorkflow()
    qw.set_up("./case.xml")
    print "==========1st question=========="
    user_selct = {'man':'1'}
    qw.run(user_selct)
    print "==========2nd question=========="
    user_selct = {'man':'1', 'house': '2'}
    qw.run(user_selct)
    print "==========3rd question=========="
    user_selct = {'man':'1', 'house': '2', 'why': 'because you are a hero'}
    qw.run(user_selct)

    '''
    print "==========4th question========="
    user_selct = {'man':'1', 'house': '2', 'role':'5'}
    qw.run(user_selct)
    '''

    print "==========fix some question=========="
    user_selct = {'man':'1', 'house': '1', 'role':'5'}
    qw.run(user_selct,True)

    print

the template case.xml file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?xml version="1.0" encoding="UTF-8"?>
<process-definition name="Daixm Workflow" revision="1.6">
    <description>
    A test workflow that contains all possible tasks.
    </description>

    <!-- Start with an implicit simple split. -->
    <start-task>
        <successor>house_choice</successor>
    </start-task>

    <exclusive-choice name="house_choice">
        <!-- No House -->
        <default-successor>role_choice</default-successor>

        <conditional-successor>
            <equals left-field="house" right-value="1" />
            <successor>build_house</successor>
        </conditional-successor>

        <conditional-successor>
            <equals left-field="house" right-value="2" />
            <successor>buy_house</successor>
        </conditional-successor>
    </exclusive-choice>

    <task name="buy_house">
        <description></description>
        <successor>house_location</successor>
    </task>

    <task name="house_location">
        <successor>house_age</successor>
    </task>

    <task name="house_age">
        <successor>role_choice</successor>
    </task>

    <task name="build_house">
        <successor>role_choice</successor>
    </task>


    <exclusive-choice name="role_choice">
        <!-- No House -->
        <default-successor>no_jobs</default-successor>
        <conditional-successor>
            <equals left-field="role" right-value="1" />
            <successor>jobs_working</successor>
        </conditional-successor>
    </exclusive-choice>


    <task name="no_jobs">
        <successor>last</successor>
    </task>

    <task name="jobs_working">
        <successor>last</successor>
    </task>

    <!-- A final task. -->
    <task name="last">
        <successor>end</successor>
    </task>
</process-definition>

Comments