-
Notifications
You must be signed in to change notification settings - Fork 0
/
pygame_player.py
243 lines (194 loc) · 9.47 KB
/
pygame_player.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import pygame
import numpy # import is unused but required or we fail later
from pygame.constants import K_DOWN, K_UP, KEYDOWN, KEYUP, QUIT
import pygame.surfarray
import pygame.key
def function_intercept(intercepted_func, intercepting_func):
"""
Intercepts a method call and calls the supplied intercepting_func with the result of it's call and it's arguments
Example:
def get_event(result_of_real_event_get, *args, **kwargs):
# do work
return result_of_real_event_get
pygame.event.get = function_intercept(pygame.event.get, get_event)
:param intercepted_func: The function we are going to intercept
:param intercepting_func: The function that will get called after the intercepted func. It is supplied the return
value of the intercepted_func as the first argument and it's args and kwargs.
:return: a function that combines the intercepting and intercepted function, should normally be set to the
intercepted_functions location
"""
def wrap(*args, **kwargs):
# call the function we are intercepting so the screen buffer is updated and get it's result
real_results = intercepted_func(*args, **kwargs)
# call our own function to get the screen buffer
intercepted_results = intercepting_func(real_results, *args, **kwargs)
return intercepted_results
return wrap
class PyGamePlayer(object):
def __init__(self, force_game_fps=10, run_real_time=False, pass_quit_event=True):
"""
Abstract class for learning agents, such as running reinforcement learning neural nets against PyGame games.
The get_keys_pressed and get_feedback methods must be overriden by a subclass to use
Call start method to start playing and intercepting PyGame and training our machine
:param force_game_fps: Fixes the pygame timer functions so the ai will get input as if it were running at this
fps
:type force_game_fps: int
:param run_real_time: If True the game will actually run at the force_game_fps speed
:type run_real_time: bool
:param pass_quit_event: If True the ai will be asked for the quit event
:type pass_quit_event: bool
"""
self.force_game_fps = force_game_fps
"""Fixes the pygame timer functions so the ai will get input as if it were running at this fps"""
self.run_real_time = run_real_time
"""If True the game will actually run at the force_game_fps speed"""
self.pass_quit_event = pass_quit_event
"""Decides whether the quit event should be passed on to the game"""
self._keys_pressed = []
self._last_keys_pressed = []
self._playing = False
self._default_flip = pygame.display.flip
self._default_update = pygame.display.update
self._default_event_get = pygame.event.get
self._default_time_clock = pygame.time.Clock
self._default_get_ticks = pygame.time.get_ticks
self._game_time = 0.0
def get_keys_pressed(self, screen_array, feedback, terminal):
"""
Called whenever the screen buffer is refreshed. returns the keys we want pressed in the next until the next
screen refresh
:param screen_array: 3d numpy.array of float. screen_width * screen_height * rgb
:param feedback: result of call to get_feedback
:param terminal: boolean, True if we have reached a terminal state, meaning the next frame will be a restart
:return: a list of the integer values of the keys we want pressed. See pygame.constants for values
"""
raise NotImplementedError("Please override this method")
def get_feedback(self):
"""
Overridden method should hook into game events to give feeback to the learning agent
:return: First = value we want to give as reward/punishment to our learning agent
Second = Boolean true if we have reached a terminal state
:rtype: tuple (float, boolean)
"""
raise NotImplementedError("Please override this method")
def start(self):
"""
Start playing the game. We will now start listening for screen updates calling our play and reward functions
and returning our intercepted key presses
"""
if self._playing:
raise Exception("Already playing")
# function_intercept(intercepted function, intercepting function), flip is
# redundant to display and is only needed for some games
pygame.display.flip = function_intercept(pygame.display.flip, self._on_screen_update)
# set our on_screen_update function to always get called whenever the screen is updated
pygame.display.update = function_intercept(pygame.display.update, self._on_screen_update)
# pass user or agent actions to game engine
pygame.event.get = function_intercept(pygame.event.get, self._on_event_get)
# Handle game physics to accomdate computation time of the learning agent
pygame.time.Clock = function_intercept(pygame.time.Clock, self._on_time_clock)
pygame.time.get_ticks = function_intercept(pygame.time.get_ticks, self.get_game_time_ms)
# TODO: handle pygame.time.set_timer...
self._playing = True
def stop(self):
"""
Stop playing the game. Will try and return PyGame to the state it was in before we started
"""
if not self._playing:
raise Exception("Already stopped")
pygame.display.flip = self._default_flip
pygame.display.update = self._default_update
pygame.event.get = self._default_event_get
pygame.time.Clock = self._default_time_clock
pygame.time.get_ticks = self._default_get_ticks
self._playing = False
@property
def playing(self):
"""
Returns if we are in a state where we are playing/intercepting PyGame calls
:return: boolean
"""
return self._playing
@playing.setter
def playing(self, value):
if self._playing == value:
return
if self._playing:
self.stop()
else:
self.start()
def get_ms_per_frame(self):
return 1000.0 / self.force_game_fps
def get_game_time_ms(self):
return self._game_time
def _on_time_clock(self, real_clock, *args, **kwargs):
return self._FixedFPSClock(self, real_clock)
def _on_screen_update(self, _, *args, **kwargs):
# get the pixel representation of the game state
surface_array = pygame.surfarray.array3d(pygame.display.get_surface())
# Determine the reward (i.e agent scored or opponent scored), and whether
# a score occurred (e.g. terminal state or not)
reward, terminal = self.get_feedback()
# Get the agent's chosen action
keys = self.get_keys_pressed(surface_array, reward, terminal)
# Update actions
self._last_keys_pressed = self._keys_pressed
self._keys_pressed = keys
# now we have processed a frame increment the game timer
self._game_time += self.get_ms_per_frame()
def _on_event_get(self, _, *args, **kwargs):
key_up_events = []
if len(self._last_keys_pressed) > 0:
diff_list = list(set( self._last_keys_pressed) - set(self._keys_pressed))
key_up_events = [pygame.event.Event(KEYUP, {"key": x}) for x in diff_list]
key_down_events = [pygame.event.Event(KEYDOWN, {"key": x}) for x in self._keys_pressed]
result = []
# have to deal with arg type filters
if args:
if hasattr(args[0], "__iter__"):
args = args[0]
for type_filter in args:
if type_filter == QUIT:
if type_filter == QUIT:
if self.pass_quit_event:
for e in _:
if e.type == QUIT:
result.append(e)
else:
pass # never quit
elif type_filter == KEYUP:
result = result + key_up_events
elif type_filter == KEYDOWN:
result = result + key_down_events
else:
result = key_down_events + key_up_events
if self.pass_quit_event:
for e in _:
if e.type == QUIT:
result.append(e)
return result
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
class _FixedFPSClock(object):
def __init__(self, pygame_player, real_clock):
self._pygame_player = pygame_player
self._real_clock = real_clock
def tick(self, _=None):
if self._pygame_player.run_real_time:
return self._real_clock.tick(self._pygame_player.force_game_fps)
else:
return self._pygame_player.get_ms_per_frame()
def tick_busy_loop(self, _=None):
if self._pygame_player.run_real_time:
return self._real_clock.tick_busy_loop(self._pygame_player.force_game_fps)
else:
return self._pygame_player.get_ms_per_frame()
def get_time(self):
return self._pygame_player.get_game_time_ms()
def get_raw_time(self):
return self._pygame_player.get_game_time_ms()
def get_fps(self):
return int(1.0 / self._pygame_player.get_ms_per_frame())