-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathevent_sourced_aggregate.rb
More file actions
158 lines (128 loc) · 3.73 KB
/
event_sourced_aggregate.rb
File metadata and controls
158 lines (128 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
require "bundler/inline"
gemfile true do
source "https://rubygems.org"
gem "ruby_event_store", "~> 2.16"
gem "aggregate_root", "~> 2.16"
gem "ostruct"
end
require_relative "dcb_event_store"
# based on https://dcb.events/examples/event-sourced-aggregate/
# event type definitions:
class CourseDefined < RubyEventStore::Event
def self.tags = ->(event) { "course:#{event.data[:course_id]}" }
end
class CourseCapacityChanged < RubyEventStore::Event
def self.tags = ->(event) { "course:#{event.data[:course_id]}" }
end
class StudentSubscribedToCourse < RubyEventStore::Event
def self.tags =
->(event) do
["student:#{event.data[:student_id]}", "course:#{event.data[:course_id]}"]
end
end
# commands
DefineCourse = Data.define(:course_id, :capacity)
ChangeCourseCapacity = Data.define(:course_id, :new_capacity)
SubscribeStudentToCourse = Data.define(:course_id, :student_id)
class CourseAggregate
include AggregateRoot
def initialize
@number_of_subscriptions = 0
end
def create(id, title, capacity)
apply CourseDefined.new(
data: {
course_id: id,
title: title,
capacity: capacity
}
)
end
def change_capacity(new_capacity)
if new_capacity == capacity
raise Error,
"Course #{course_id} already has a capacity of #{new_capacity}"
end
if new_capacity < number_of_subscriptions
raise Error,
"Course #{course_id} already has #{number_of_subscriptions} active subscriptions, can't set the capacity below that"
end
apply CourseCapacityChanged.new(
data: {
course_id: course_id,
new_capacity: new_capacity
}
)
end
def subscribe_student(student_id)
if number_of_subscriptions == capacity
raise Error, "Course #{course_id} is already fully booked"
end
apply StudentSubscribedToCourse.new(
data: {
student_id: student_id,
course_id: course_id
}
)
end
attr_reader :course_id
private
attr_reader :title, :capacity, :number_of_subscriptions
on CourseDefined do |event|
@course_id = event.data[:course_id]
@title = event.data[:title]
@capacity = event.data[:capacity]
end
on CourseCapacityChanged do |event|
@capacity = event.data[:new_capacity]
end
on StudentSubscribedToCourse do |_event|
@number_of_subscriptions += 1
end
end
# DCB flavored repository
class DcbCourseRepository
def initialize(event_store)
@event_store = event_store
end
def load(course_id)
stream_name = "course:#{course_id}"
aggregate = CourseAggregate.new
query(stream_name).reduce { |_, ev| aggregate.apply(ev) }
aggregate.version = aggregate.unpublished_events.to_a.last.event_id
aggregate
end
def save(aggregate)
stream_name = "course:#{aggregate.course_id}"
event_store.append(
aggregate.unpublished_events.to_a,
query(stream_name),
aggregate.version == -1 ? nil : aggregate.version
)
aggregate.version = aggregate.unpublished_events.to_a.last.event_id
end
private
attr_reader :event_store
def query(stream_name)
event_store.read.stream(stream_name)
end
end
event_store = DcbEventStore.new
# create and save a new instance:
repository = DcbCourseRepository.new(event_store)
course = CourseAggregate.new
course.create("c1", "Course 01", 10)
repository.save(course)
# update an existing instance:
course2 = repository.load("c1")
course2.change_capacity(15)
repository.save(course2)
# read data here
puts "Stored events:\n---\n"
event_store
.read
.stream("course:c1")
.each do |event|
puts [event.class, event.timestamp, event.data].map(&:inspect).join("\n")
puts "---\n"
end