Skip to content

Commit

Permalink
revert just to annoy dan
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzawa-san committed Jun 12, 2024
1 parent 34a859d commit 32120e4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 74 deletions.
120 changes: 62 additions & 58 deletions src/main/java/com/jyuzawa/googolplex_theater/GoogolplexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jmdns.ServiceEvent;
import javax.jmdns.ServiceInfo;
import javax.jmdns.impl.util.NamedThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* This class represents the state of the application. All modifications to state occur in the same
Expand All @@ -53,7 +56,7 @@ public class GoogolplexService implements Closeable {
private final Map<String, DeviceInfo> nameToDeviceInfo;
private final Map<String, InetSocketAddress> nameToAddress;
private final Map<String, Channel> nameToChannel;
private final Scheduler executor;
private final ExecutorService executor;

@Autowired
public GoogolplexService(GoogolplexClient client) {
Expand All @@ -62,40 +65,38 @@ public GoogolplexService(GoogolplexClient client) {
this.nameToDeviceInfo = new ConcurrentHashMap<>();
this.nameToAddress = new ConcurrentHashMap<>();
this.nameToChannel = new ConcurrentHashMap<>();
this.executor = Schedulers.newSingle("controller");
this.executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("controller"));
}

private record Channel(AtomicReference<Instant> birth, Disposable disposable) {}

/**
* Load the config and propagate the changes to the any currently connected devices.
*
* @param deviceInfos the devices loaded from the config file
* @param deviceInfos the device settings loaded from the file
*/
public Disposable processDeviceConfig(List<DeviceInfo> deviceInfos) {
return executor.schedule(() -> processDeviceConfig0(deviceInfos));
}

void processDeviceConfig0(List<DeviceInfo> deviceInfos) {
Set<String> namesToRemove = new HashSet<>(nameToDeviceInfo.keySet());
for (DeviceInfo deviceInfo : deviceInfos) {
String name = deviceInfo.getName();
// mark that we should not remove this device
namesToRemove.remove(name);
DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name);
// ignore unchanged devices
if (!deviceInfo.equals(oldDeviceInfo)) {
log.info("CONFIG_UPDATED '{}'", name);
nameToDeviceInfo.put(name, deviceInfo);
public Future<?> processDeviceConfig(List<DeviceInfo> deviceInfos) {
return executor.submit(() -> {
Set<String> namesToRemove = new HashSet<>(nameToDeviceInfo.keySet());
for (DeviceInfo deviceInfo : deviceInfos) {
String name = deviceInfo.getName();
// mark that we should not remove this device
namesToRemove.remove(name);
DeviceInfo oldDeviceInfo = nameToDeviceInfo.get(name);
// ignore unchanged devices
if (!deviceInfo.equals(oldDeviceInfo)) {
log.info("CONFIG_UPDATED '{}'", name);
nameToDeviceInfo.put(name, deviceInfo);
apply(name);
}
}
// remove devices that were missing in the new config
for (String name : namesToRemove) {
log.info("CONFIG_REMOVED '{}'", name);
nameToDeviceInfo.remove(name);
apply(name);
}
}
// remove devices that were missing in the new config
for (String name : namesToRemove) {
log.info("CONFIG_REMOVED '{}'", name);
nameToDeviceInfo.remove(name);
apply(name);
}
});
}

/**
Expand All @@ -104,36 +105,34 @@ void processDeviceConfig0(List<DeviceInfo> deviceInfos) {
*
* @param event mdns info
*/
public Disposable register(ServiceEvent event) {
return executor.schedule(() -> register0(event));
}

void register0(ServiceEvent event) {
// the device information may not be full
ServiceInfo info = event.getInfo();
String name = info.getPropertyString("fn");
if (name == null) {
log.debug("Found unnamed cast:\n{}", info);
return;
}
InetAddress[] addresses = info.getInetAddresses();
if (addresses == null || addresses.length == 0) {
log.debug("Found unaddressable cast:\n{}", info);
return;
}
/*
* we choose the first address. there should usually be just one. the mdns library returns ipv4
* addresses before ipv6.
*/
InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort());
InetSocketAddress oldAddress = nameToAddress.put(name, address);
if (!address.equals(oldAddress)) {
public Future<?> register(ServiceEvent event) {
return executor.submit(() -> {
// the device information may not be full
ServiceInfo info = event.getInfo();
String name = info.getPropertyString("fn");
if (name == null) {
log.debug("Found unnamed cast:\n{}", info);
return;
}
InetAddress[] addresses = info.getInetAddresses();
if (addresses == null || addresses.length == 0) {
log.debug("Found unaddressable cast:\n{}", info);
return;
}
/*
* this is a newly discovered device, or an existing device whose address was updated.
* we choose the first address. there should usually be just one. the mdns library returns ipv4
* addresses before ipv6.
*/
log.info("REGISTER '{}' {}", name, address);
apply(name);
}
InetSocketAddress address = new InetSocketAddress(addresses[0], info.getPort());
InetSocketAddress oldAddress = nameToAddress.put(name, address);
if (!address.equals(oldAddress)) {
/*
* this is a newly discovered device, or an existing device whose address was updated.
*/
log.info("REGISTER '{}' {}", name, address);
apply(name);
}
});
}

/**
Expand Down Expand Up @@ -169,8 +168,8 @@ private void apply(String name) {
*
* @param name the device to refresh
*/
public Disposable refresh(String name) {
return executor.schedule(() -> {
public Future<?> refresh(String name) {
return executor.submit(() -> {
// closing channels will cause them to reconnect
if (name == null) {
// close all channels
Expand Down Expand Up @@ -218,7 +217,12 @@ public List<DeviceStatus> getDeviceInfo() {
public void close() {
nameToDeviceInfo.clear();
refresh(null);
executor.disposeGracefully().block(Duration.ofSeconds(10));
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// pass
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ void loaderTest() throws IOException, InterruptedException {
}
BlockingQueue<List<DeviceInfo>> queue = new ArrayBlockingQueue<>(10);
GoogolplexService controller = Mockito.mock(GoogolplexService.class);
Mockito.when(controller.processDeviceConfig(Mockito.any())).then(new Answer<Void>() {
Mockito.doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<DeviceInfo> newDevices = invocation.getArgument(0);
queue.add(newDevices);
return null;
}
});
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<DeviceInfo> newDevices = invocation.getArgument(0);
queue.add(newDevices);
return null;
}
})
.when(controller)
.processDeviceConfig(Mockito.any());
ServiceDiscovery serviceDiscovery = Mockito.mock(ServiceDiscovery.class);
String ipAddress = "192.168.1.239";
InetAddress address = InetAddress.getByName(ipAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,31 @@ void test() throws Exception {
devices.add(cast2.device());
devices.add(cast3.device());
devices.add(cast4.device());
service.register0(cast1.event());
service.register0(cast2.event());
service.register(cast1.event()).get();
service.register(cast2.event()).get();
Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.any(), Mockito.any());
service.processDeviceConfig0(devices);
service.processDeviceConfig(devices).get();
Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast1.device()), Mockito.any());
Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast2.device()), Mockito.any());
Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any());
Mockito.verify(client, Mockito.never()).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any());
service.register(cast3.event());
service.register(cast4.event());
service.register(cast3.event()).get();
service.register(cast4.event()).get();
Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast3.device()), Mockito.any());
Mockito.verify(client).connect(Mockito.any(), Mockito.eq(cast4.device()), Mockito.any());
service.register(FakeCast.event(9005, "UnknownCast"));
service.register(FakeCast.event(9005, "UnknownCast")).get();
ServiceEvent noName = Mockito.mock(ServiceEvent.class);
ServiceInfo noNameInfo = Mockito.mock(ServiceInfo.class);
Mockito.when(noName.getInfo()).thenReturn(noNameInfo);
Mockito.when(noNameInfo.getPropertyString(Mockito.anyString())).thenReturn(null);
service.register(noName);
service.register(noName).get();
ServiceEvent noAddr = Mockito.mock(ServiceEvent.class);
Mockito.when(noAddr.getName()).thenReturn("Chromecast-NOIP.local");
ServiceInfo noAddrInfo = Mockito.mock(ServiceInfo.class);
Mockito.when(noAddr.getInfo()).thenReturn(noAddrInfo);
Mockito.when(noAddrInfo.getPropertyString(Mockito.anyString())).thenReturn("NOIP");
Mockito.when(noAddrInfo.getInetAddresses()).thenReturn(new InetAddress[] {});
service.register(noAddr);
service.register(noAddr).get();

List<DeviceStatus> deviceInfos = service.getDeviceInfo();
Set<String> configureds = getConfigureds(deviceInfos);
Expand Down

0 comments on commit 32120e4

Please sign in to comment.