-
Notifications
You must be signed in to change notification settings - Fork 2
/
device.py
329 lines (262 loc) · 12.7 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#!/usr/bin/python
# -*- coding:utf8 -*-
import random
import math
random.seed() # 随机数种子
class Interface(object):
def __init__(self, i_id):
self.__id = i_id
def get_id(self):
return self.__id
class Highway(object):
def __init__(self, start_x_point, start_y_point, end_x_point, end_y_point):
self.__start_x_point = start_x_point
self.__start_y_point = start_y_point
self.__end_x_point = end_x_point
self.__end_y_point = end_y_point
def get_start_x_point(self):
return self.__start_x_point
def get_start_y_point(self):
return self.__start_y_point
def get_end_x_point(self):
return self.__end_x_point
def get_end_y_point(self):
return self.__end_y_point
class Vehicle(Interface):
def __init__(self, i_id):
# 调用父类的构造函数
Interface.__init__(self, i_id)
# 随机生成车辆行进方向
rand_num = random.random()
if rand_num > 0.5:
self.__direction = 1 # 正方向
else:
self.__direction = -1 # 负方向
self.__x_point = -1
self.__y_point = -1
self.__allocated_RB = []
def set_location(self, x_point, y_point):
self.__x_point = x_point
self.__y_point = y_point
def get_x_point(self):
return self.__x_point
def get_y_point(self):
return self.__y_point
def get_direction(self):
return self.__direction
def set_allocated_rb(self, rb_id):
self.__allocated_RB.append(rb_id)
def get_allocated_rb(self):
return self.__allocated_RB
class V2IVehicle(Vehicle):
def __init__(self, i_id):
# 调用父类的构造函数
Vehicle.__init__(self, i_id)
self.__txID = 0
self.__inter_txID2weight = {} # 存储干扰权值的字典 键——干扰发射机ID 值——干扰权值
self.__sinr = 0
def update_location(self, highway):
# 随机生成车辆位置
temp_x = random.random() * (highway.get_end_x_point() - highway.get_start_x_point())
x_point = highway.get_start_x_point() + temp_x
temp_y = random.random() * (highway.get_end_y_point() - highway.get_start_y_point())
y_point = highway.get_start_y_point() + temp_y
Vehicle.set_location(self, x_point, y_point)
def work(self, dict_id2tx, dict_id2channel):
target_tx = dict_id2tx[self.__txID] # 目标发射机
target_power = target_tx.get_power() # dBm
target_power = pow(10, (target_power - 30) / 10) # W
target_channel = dict_id2channel[self.get_id()]
target_link_loss = target_channel.get_link_loss(self.__txID) # dB
target_gain = pow(10, -target_link_loss / 10)
for tx_id in dict_id2tx:
if tx_id != self.__txID:
inter_tx = dict_id2tx[tx_id] # 干扰发射机
inter_power = inter_tx.get_power() # dBm
inter_power = pow(10, (inter_power - 30) / 10) # W
inter_channel = dict_id2channel[self.get_id()]
inter_link_loss = inter_channel.get_link_loss(tx_id) # dB
inter_gain = pow(10, -inter_link_loss / 10)
weight = (inter_power * inter_gain) / (target_power * target_gain)
self.__inter_txID2weight[tx_id] = weight
def get_tx_id(self):
return self.__txID
def get_weight(self, tx_id):
return self.__inter_txID2weight[tx_id]
def comp_sinr(self, dict_id2tx, dict_id2channel): # 计算接收 SINR
if len(self.get_allocated_rb()):
# 计算噪声功率 1个RB, 12个连续的载波, 12 * 15000 = 180000Hz
white_noise = -174 # -174dBm / Hz
noise_fig = 5 # dB
noise_fig = pow(10, noise_fig / 10) # 线性值
thermalNoisePow = pow(10, (white_noise - 30) / 10) * 180000 * noise_fig # 线性值
# 计算接收目标信号功率
target_tx = dict_id2tx[self.__txID] # 目标发射机
target_power = target_tx.get_power() # dBm
target_power = pow(10, (target_power - 30) / 10) # W
target_channel = dict_id2channel[self.get_id()]
target_link_loss = target_channel.get_link_loss(self.__txID) # dB
target_gain = pow(10, -target_link_loss / 10)
receive_target_power = target_power * target_gain
# 计算接收干扰信号总功率
receive_inter_power = 0
for tx_id in dict_id2tx:
if tx_id != self.__txID:
if self.get_allocated_rb()[0] in dict_id2tx[tx_id].get_allocated_rb():
inter_tx = dict_id2tx[tx_id] # 干扰发射机
inter_power = inter_tx.get_power() # dBm
inter_power = pow(10, (inter_power - 30) / 10) # W
inter_channel = dict_id2channel[self.get_id()]
inter_link_loss = inter_channel.get_link_loss(tx_id) # dB
inter_gain = pow(10, -inter_link_loss / 10)
receive_inter_power += inter_power * inter_gain
self.__sinr = 10 * math.log10(receive_target_power / (receive_inter_power + thermalNoisePow))
def get_sinr(self):
return self.__sinr
class V2VTxVehicle(Vehicle):
def __init__(self, i_id):
# 调用父类的构造函数
Vehicle.__init__(self, i_id)
self.__rxID = -1
self.__power = 5 # 车辆发射功率 dBm
def update_location(self, highway):
# 随机生成车辆位置
temp_x = random.random() * (highway.get_end_x_point() - highway.get_start_x_point())
x_point = highway.get_start_x_point() + temp_x
temp_y = random.random() * (highway.get_end_y_point() - highway.get_start_y_point())
y_point = highway.get_start_y_point() + temp_y
Vehicle.set_location(self, x_point, y_point)
def set_rx_id(self, rx_id):
self.__rxID = rx_id
def get_power(self):
return self.__power
class V2VRxVehicle(Vehicle):
def __init__(self, i_id):
# 调用父类的构造函数
Vehicle.__init__(self, i_id)
self.__txID = -1
self.__inter_txID2weight = {} # 存储干扰权值的字典 键——干扰发射机ID 值——干扰权值
self.__sinr = 0
def update_location(self, highway, v2v_tx_vehicle):
temp_x = (random.random() - 0.5) * 10 # 设 V2V 之间最大间距500m
temp_y = (random.random() - 0.5) * 10
if highway.get_end_x_point() - highway.get_start_x_point():
x_point = v2v_tx_vehicle.get_x_point() + temp_x
y_point = v2v_tx_vehicle.get_y_point()
else:
x_point = v2v_tx_vehicle.get_x_point()
y_point = v2v_tx_vehicle.get_y_point() + temp_y
Vehicle.set_location(self, x_point, y_point)
self.__txID = v2v_tx_vehicle.get_id()
v2v_tx_vehicle.set_rx_id(self.get_id())
def work(self, dict_id2tx, dict_id2channel):
target_tx = dict_id2tx[self.__txID] # 目标发射机
target_power = target_tx.get_power() # dBm
target_power = pow(10, (target_power - 30) / 10) # W
target_channel = dict_id2channel[self.get_id()]
target_link_loss = target_channel.get_link_loss(self.__txID) # dB
target_gain = pow(10, -target_link_loss / 10)
for tx_id in dict_id2tx:
if tx_id != self.__txID:
inter_tx = dict_id2tx[tx_id] # 干扰发射机
inter_power = inter_tx.get_power() # dBm
inter_power = pow(10, (inter_power - 30) / 10) # W
inter_channel = dict_id2channel[self.get_id()]
inter_link_loss = inter_channel.get_link_loss(tx_id) # dB
inter_gain = pow(10, -inter_link_loss / 10)
weight = (inter_power * inter_gain) / (target_power * target_gain)
self.__inter_txID2weight[tx_id] = weight
def get_tx_id(self):
return self.__txID
def get_weight(self, tx_id):
return self.__inter_txID2weight[tx_id]
def comp_sinr(self, dict_id2tx, dict_id2channel): # 计算接收 SINR
if len(self.get_allocated_rb()):
# 计算噪声功率 1个RB, 12个连续的载波, 12 * 15000 = 180000Hz
white_noise = -174 # -174dBm / Hz
noise_fig = 5 # dB
noise_fig = pow(10, noise_fig / 10) # 线性值
thermalNoisePow = pow(10, (white_noise - 30) / 10) * 180000 * noise_fig # 线性值
# 计算接收目标信号功率
target_tx = dict_id2tx[self.__txID] # 目标发射机
target_power = target_tx.get_power() # dBm
target_power = pow(10, (target_power - 30) / 10) # W
target_channel = dict_id2channel[self.get_id()]
target_link_loss = target_channel.get_link_loss(self.__txID) # dB
target_gain = pow(10, -target_link_loss / 10)
receive_target_power = target_power * target_gain
# 计算接收干扰信号总功率
receive_inter_power = 0
for tx_id in dict_id2tx:
if tx_id != self.__txID:
if self.get_allocated_rb()[0] in dict_id2tx[tx_id].get_allocated_rb():
inter_tx = dict_id2tx[tx_id] # 干扰发射机
inter_power = inter_tx.get_power() # dBm
inter_power = pow(10, (inter_power - 30) / 10) # W
inter_channel = dict_id2channel[self.get_id()]
inter_link_loss = inter_channel.get_link_loss(tx_id) # dB
inter_gain = pow(10, -inter_link_loss / 10)
receive_inter_power += inter_power * inter_gain
self.__sinr = 10 * math.log10(receive_target_power / (receive_inter_power + thermalNoisePow))
def get_sinr(self):
return self.__sinr
class RSU(Interface):
def __init__(self, i_id):
# 调用父类的构造函数
Interface.__init__(self, i_id)
self.__power = 20 # 发射功率 dBm
self.__x_point = 0
self.__y_point = 0
self.__direction = 0 # 路边单元静止 没有行进方向
self.__allocated_RB = []
def get_x_point(self):
return self.__x_point
def get_y_point(self):
return self.__y_point
def get_direction(self):
return self.__direction
def get_power(self):
return self.__power
def set_allocated_rb(self, rb_id):
self.__allocated_RB.append(rb_id)
def get_allocated_rb(self):
return self.__allocated_RB
class Channel(object):
def __init__(self, rx_id):
self.__rx_id = rx_id
self.__link_loss = {} # 存储链路损耗的字典 键——发射机id值 值——链路损耗
self.__id2direction = {} # 存储行进方向的字典 键——发射机id值 值——方向
self.__id2distance = {} # 存储距离的字典 键——发射机id值 值——距离
def update_link_loss(self, tx_device, rx_device):
distance = get_distance(tx_device.get_x_point(), tx_device.get_y_point(),
rx_device.get_x_point(), rx_device.get_y_point())
self.__id2distance[tx_device.get_id()] = distance
if tx_device.get_direction() == 0: # 发射机是路边单元
if rx_device.get_direction() * rx_device.get_x_point() > 0:
direction = 1 # 车辆行驶方向与车辆坐标同号,车辆相对于路边单元反向行驶
self.__id2direction[tx_device.get_id()] = "reverse"
else:
direction = -1
self.__id2direction[tx_device.get_id()] = "forward"
else: # 发射机是车辆
if tx_device.get_direction() * rx_device.get_direction() > 0:
direction = 0 # 发射机和接收机同向行驶
self.__id2direction[tx_device.get_id()] = "convoy"
elif (rx_device.get_x_point() - tx_device.get_x_point()) * rx_device.get_direction() > 0:
direction = 1
self.__id2direction[tx_device.get_id()] = "reverse"
else:
direction = -1
self.__id2direction[tx_device.get_id()] = "forward"
link_loss = 63.3 + 10 * math.log10(distance / 10) + random.uniform(0, 3.1) + direction * 3.3
self.__link_loss[tx_device.get_id()] = link_loss
def get_rx_id(self):
return self.__rx_id
def get_link_loss(self, tx_id):
return self.__link_loss[tx_id]
def get_direction(self, tx_id):
return self.__id2direction[tx_id]
def get_distance(self, tx_id):
return self.__id2distance[tx_id]
def get_distance(x1, y1, x2, y2):
return pow(pow((x1 - x2), 2) + pow((y1 - y2), 2), 0.5)