Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions model/event_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022 The Serverless Workflow Specification Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
"encoding/json"
)

// EventState used to wait for events from event sources, then consumes them and invoke one or more actions to run in sequence or parallel
type EventState struct {
// TODO: EventState doesn't have usedForCompensation field.
BaseState

// If true consuming one of the defined events causes its associated actions to be performed. If false all of the defined events must be consumed in order for actions to be performed
// Defaults to true
Exclusive bool `json:"exclusive,omitempty"`
// Define the events to be consumed and optional actions to be performed
OnEvents []OnEvents `json:"onEvents" validate:"required,min=1,dive"`
// State specific timeouts
Timeout *EventStateTimeout `json:"timeouts,omitempty"`
}

type eventStateForUnmarshal EventState

// UnmarshalJSON unmarshal EventState object from json bytes
func (e *EventState) UnmarshalJSON(data []byte) error {
v := eventStateForUnmarshal{
Exclusive: true,
}
err := json.Unmarshal(data, &v)
if err != nil {
return err
}

*e = EventState(v)
return nil
}

// OnEvents define which actions are be be performed for the one or more events.
type OnEvents struct {
// References one or more unique event names in the defined workflow events
EventRefs []string `json:"eventRefs" validate:"required,min=1"`
// Specifies how actions are to be performed (in sequence or parallel)
// Defaults to sequential
ActionMode ActionMode `json:"actionMode,omitempty" validate:"required,oneof=sequential parallel"`
// Actions to be performed if expression matches
Actions []Action `json:"actions,omitempty" validate:"omitempty,dive"`
// Event data filter
EventDataFilter EventDataFilter `json:"eventDataFilter,omitempty"`
}

type onEventsForUnmarshal OnEvents

// UnmarshalJSON unmarshal OnEvents object from json bytes
func (o *OnEvents) UnmarshalJSON(data []byte) error {
v := onEventsForUnmarshal{
ActionMode: ActionModeSequential,
}

err := json.Unmarshal(data, &v)
if err != nil {
return err
}

*o = OnEvents(v)
return nil
}

// EventStateTimeout defines timeout settings for event state
type EventStateTimeout struct {
StateExecTimeout StateExecTimeout `json:"stateExecTimeout,omitempty"`
ActionExecTimeout string `json:"actionExecTimeout,omitempty" validate:"omitempty,iso8601duration"`
EventTimeout string `json:"eventTimeout,omitempty" validate:"omitempty,iso8601duration"`
}
148 changes: 148 additions & 0 deletions model/event_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2022 The Serverless Workflow Specification Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
)

func TestEventStateUnmarshalJSON(t *testing.T) {
type testCase struct {
desp string
data string
expect EventState
err string
}
testCases := []testCase{
{
desp: "all fields set",
data: `{"name": "1", "Type": "event", "exclusive": false, "onEvents": [{"eventRefs": ["E1", "E2"], "actionMode": "parallel"}], "timeouts": {"actionExecTimeout": "PT5M", "eventTimeout": "PT5M", "stateExecTimeout": "PT5M"}}`,
expect: EventState{
BaseState: BaseState{
Name: "1",
Type: StateTypeEvent,
},
Exclusive: false,
OnEvents: []OnEvents{
{
EventRefs: []string{"E1", "E2"},
ActionMode: "parallel",
},
},
Timeout: &EventStateTimeout{
EventTimeout: "PT5M",
ActionExecTimeout: "PT5M",
StateExecTimeout: StateExecTimeout{
Total: "PT5M",
},
},
},
err: ``,
},
{
desp: "default exclusive",
data: `{"name": "1", "Type": "event", "onEvents": [{"eventRefs": ["E1", "E2"], "actionMode": "parallel"}], "timeouts": {"actionExecTimeout": "PT5M", "eventTimeout": "PT5M", "stateExecTimeout": "PT5M"}}`,
expect: EventState{
BaseState: BaseState{
Name: "1",
Type: StateTypeEvent,
},
Exclusive: true,
OnEvents: []OnEvents{
{
EventRefs: []string{"E1", "E2"},
ActionMode: "parallel",
},
},
Timeout: &EventStateTimeout{
EventTimeout: "PT5M",
ActionExecTimeout: "PT5M",
StateExecTimeout: StateExecTimeout{
Total: "PT5M",
},
},
},
err: ``,
},
}
for _, tc := range testCases {
t.Run(tc.desp, func(t *testing.T) {
v := EventState{}
err := json.Unmarshal([]byte(tc.data), &v)

if tc.err != "" {
assert.Error(t, err)
assert.Regexp(t, tc.err, err)
return
}

assert.NoError(t, err)
assert.Equal(t, tc.expect, v)
})
}
}

func TestOnEventsUnmarshalJSON(t *testing.T) {
type testCase struct {
desp string
data string
expect OnEvents
err string
}
testCases := []testCase{
{
desp: "all fields set",
data: `{"eventRefs": ["E1", "E2"], "actionMode": "parallel"}`,
expect: OnEvents{
EventRefs: []string{"E1", "E2"},
ActionMode: ActionModeParallel,
},
err: ``,
},
{
desp: "default action mode",
data: `{"eventRefs": ["E1", "E2"]}`,
expect: OnEvents{
EventRefs: []string{"E1", "E2"},
ActionMode: ActionModeSequential,
},
err: ``,
},
{
desp: "invalid object format",
data: `"eventRefs": ["E1", "E2"], "actionMode": "parallel"}`,
expect: OnEvents{},
err: `invalid character ':' after top-level value`,
},
}
for _, tc := range testCases {
t.Run(tc.desp, func(t *testing.T) {
v := OnEvents{}
err := json.Unmarshal([]byte(tc.data), &v)

if tc.err != "" {
assert.Error(t, err)
assert.Regexp(t, tc.err, err)
return
}

assert.NoError(t, err)
assert.Equal(t, tc.expect, v)
})
}
}
53 changes: 1 addition & 52 deletions model/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,58 +126,6 @@ func (s *BaseState) GetStateDataFilter() *StateDataFilter { return s.StateDataFi
// GetMetadata ...
func (s *BaseState) GetMetadata() *Metadata { return s.Metadata }

// EventState This state is used to wait for events from event sources, then consumes them and invoke one or more actions to run in sequence or parallel
type EventState struct {
BaseState
// If true consuming one of the defined events causes its associated actions to be performed. If false all of the defined events must be consumed in order for actions to be performed
Exclusive bool `json:"exclusive,omitempty"`
// Define the events to be consumed and optional actions to be performed
OnEvents []OnEvents `json:"onEvents" validate:"required,min=1,dive"`
// State specific timeouts
Timeout *EventStateTimeout `json:"timeouts,omitempty"`
}

// UnmarshalJSON ...
func (e *EventState) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &e.BaseState); err != nil {
return err
}

eventStateMap := make(map[string]interface{})
if err := json.Unmarshal(data, &eventStateMap); err != nil {
return err
}

e.Exclusive = true

if eventStateMap["exclusive"] != nil {
exclusiveVal, ok := eventStateMap["exclusive"].(bool)
if ok {
e.Exclusive = exclusiveVal
}
}

eventStateRaw := make(map[string]json.RawMessage)
if err := json.Unmarshal(data, &eventStateRaw); err != nil {
return err
}
if err := json.Unmarshal(eventStateRaw["onEvents"], &e.OnEvents); err != nil {
return err
}
if err := unmarshalKey("timeouts", eventStateRaw, &e.Timeout); err != nil {
return err
}

return nil
}

// EventStateTimeout ...
type EventStateTimeout struct {
StateExecTimeout StateExecTimeout `json:"stateExecTimeout,omitempty"`
ActionExecTimeout string `json:"actionExecTimeout,omitempty"`
EventTimeout string `json:"eventTimeout,omitempty"`
}

// OperationState Defines actions be performed. Does not wait for incoming events
type OperationState struct {
BaseState
Expand Down Expand Up @@ -244,6 +192,7 @@ type ForEachState struct {
// State specific timeout
Timeouts *ForEachStateTimeout `json:"timeouts,omitempty"`
// Mode Specifies how iterations are to be performed (sequentially or in parallel)
// Defaults to parallel
Mode ForEachModeType `json:"mode,omitempty"`
}

Expand Down
34 changes: 14 additions & 20 deletions model/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package model
import (
"encoding/json"
"fmt"
"reflect"

"github.com/go-playground/validator/v10"
val "github.com/serverlessworkflow/sdk-go/v2/validator"
"reflect"
)

// InvokeKind defines how the target is invoked.
Expand All @@ -33,13 +34,21 @@ const (
InvokeKindAsync InvokeKind = "async"
)

// ActionMode specifies how actions are to be performed.
type ActionMode string

const (
// DefaultExpressionLang ...
DefaultExpressionLang = "jq"
// ActionModeSequential ...
// ActionModeSequential specifies actions should be performed in sequence
ActionModeSequential ActionMode = "sequential"
// ActionModeParallel ...

// ActionModeParallel specifies actions should be performed in parallel
ActionModeParallel ActionMode = "parallel"
)

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to break it into two const blocks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as #97 (comment)

// DefaultExpressionLang ...
DefaultExpressionLang = "jq"

// UnlimitedTimeout description for unlimited timeouts
UnlimitedTimeout = "unlimited"
)
Expand Down Expand Up @@ -75,9 +84,6 @@ func continueAsStructLevelValidation(structLevel validator.StructLevel) {
}
}

// ActionMode ...
type ActionMode string

// BaseWorkflow describes the partial Workflow definition that does not rely on generic interfaces
// to make it easy for custom unmarshalers implementations to unmarshal the common data structure.
type BaseWorkflow struct {
Expand Down Expand Up @@ -462,18 +468,6 @@ type OnError struct {
End *End `json:"end,omitempty"`
}

// OnEvents ...
type OnEvents struct {
// References one or more unique event names in the defined workflow events
EventRefs []string `json:"eventRefs" validate:"required,min=1"`
// Specifies how actions are to be performed (in sequence of parallel)
ActionMode ActionMode `json:"actionMode,omitempty"`
// Actions to be performed if expression matches
Actions []Action `json:"actions,omitempty" validate:"omitempty,dive"`
// Event data filter
EventDataFilter EventDataFilter `json:"eventDataFilter,omitempty"`
}

// End definition
type End struct {
// If true, completes all execution flows in the given workflow instance
Expand Down