Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the ability of registerWeak, which could help reduce memory leak #635

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion EventBus/src/org/greenrobot/eventbus/AsyncPoster.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AsyncPoster implements Runnable, Poster {
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
public void enqueue(ISubscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
Expand Down
2 changes: 1 addition & 1 deletion EventBus/src/org/greenrobot/eventbus/BackgroundPoster.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ final class BackgroundPoster implements Runnable, Poster {
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
public void enqueue(ISubscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
Expand Down
113 changes: 80 additions & 33 deletions EventBus/src/org/greenrobot/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
Expand All @@ -47,7 +50,7 @@ public class EventBus {
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();

private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private final Map<Class<?>, CopyOnWriteArrayList<ISubscription>> subscriptionsByEventType;
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, Object> stickyEvents;

Expand Down Expand Up @@ -112,7 +115,7 @@ public EventBus() {
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
typesBySubscriber = new WeakHashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
Expand All @@ -130,6 +133,24 @@ public EventBus() {
executorService = builder.executorService;
}

/**
* Registers the given subscriber to receive events. Like register() but doesn't maintain a reference of the subscriber.
* Subscribers don't have to call {@link #unregister(Object)} even when they are no longer interested in receiving events.
* <p/>
* Subscribers have event handling methods that must be annotated by {@link Subscribe}.
* The {@link Subscribe} annotation also allows configuration like {@link
* ThreadMode} and priority.
*/
public void registerWeak(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod, new WeakSubscription(subscriber, subscriberMethod));
}
}
}

/**
* Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
* are no longer interested in receiving events.
Expand All @@ -143,16 +164,15 @@ public void register(Object subscriber) {
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
subscribe(subscriber, subscriberMethod, new Subscription(subscriber, subscriberMethod));
}
}
}

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, ISubscription newSubscription) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
CopyOnWriteArrayList<ISubscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
Expand All @@ -161,11 +181,16 @@ private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
CopyOnWriteArrayList<ISubscription> returnedSubscriptions = pruneEmptySubscription(subscriptions);
if (returnedSubscriptions != subscriptions) {
subscriptions = returnedSubscriptions;
subscriptionsByEventType.put(eventType, returnedSubscriptions);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).getSubscriberMethod().priority) {
subscriptions.add(i, newSubscription);
break;
}
Expand Down Expand Up @@ -199,7 +224,7 @@ private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
}
}

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
private void checkPostStickyEventToSubscription(ISubscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
Expand All @@ -223,13 +248,13 @@ public synchronized boolean isRegistered(Object subscriber) {

/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
List<ISubscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
ISubscription subscription = subscriptions.get(i);
if (subscription.getSubscriber() == subscriber) {
subscription.setIsActive(false);
subscriptions.remove(i);
i--;
size--;
Expand Down Expand Up @@ -290,7 +315,7 @@ public void cancelEventDelivery(Object event) {
throw new EventBusException("Event may not be null");
} else if (postingState.event != event) {
throw new EventBusException("Only the currently handled event may be aborted");
} else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
} else if (postingState.subscription.getSubscriberMethod().threadMode != ThreadMode.POSTING) {
throw new EventBusException(" event handlers may only abort the incoming event");
}

Expand Down Expand Up @@ -364,7 +389,7 @@ public boolean hasSubscriberForEvent(Class<?> eventClass) {
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
CopyOnWriteArrayList<Subscription> subscriptions;
CopyOnWriteArrayList<ISubscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(clazz);
}
Expand Down Expand Up @@ -401,12 +426,12 @@ private void postSingleEvent(Object event, PostingThreadState postingState) thro
}

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
CopyOnWriteArrayList<ISubscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
for (ISubscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
Expand All @@ -427,8 +452,26 @@ private boolean postSingleEventForEventType(Object event, PostingThreadState pos
return false;
}

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
private CopyOnWriteArrayList<ISubscription> pruneEmptySubscription(CopyOnWriteArrayList<ISubscription> origin) {
boolean modified = false;
LinkedList<ISubscription> copy = new LinkedList<>(origin);
Iterator<ISubscription> iterator = copy.iterator();
while (iterator.hasNext()) {
ISubscription subscription = iterator.next();
if (subscription.getSubscriber() == null) {
iterator.remove();
modified = true;
}
}
if (modified) {
return new CopyOnWriteArrayList<>(copy);
} else {
return origin;
}
}

private void postToSubscription(ISubscription subscription, Object event, boolean isMainThread) {
switch (subscription.getSubscriberMethod().threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
Expand Down Expand Up @@ -458,7 +501,7 @@ private void postToSubscription(Subscription subscription, Object event, boolean
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
throw new IllegalStateException("Unknown thread mode: " + subscription.getSubscriberMethod().threadMode);
}
}

Expand Down Expand Up @@ -498,28 +541,32 @@ static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
*/
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
ISubscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
if (subscription.isActive()) {
invokeSubscriber(subscription, event);
}
}

void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
void invokeSubscriber(ISubscription subscription, Object event) {
Object subscriber = subscription.getSubscriber();
if (subscriber != null) {
try {
subscription.getSubscriberMethod().method.invoke(subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
}

private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
private void handleSubscriberException(ISubscription subscription, Object event, Throwable cause) {
Object subscriber = subscription.getSubscriber();
if (event instanceof SubscriberExceptionEvent) {
if (logSubscriberExceptions) {
// Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log
logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass()
logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + (subscriber != null ? subscriber.getClass() : null)
+ " threw an exception", cause);
SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in "
Expand All @@ -531,11 +578,11 @@ private void handleSubscriberException(Subscription subscription, Object event,
}
if (logSubscriberExceptions) {
logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class "
+ subscription.subscriber.getClass(), cause);
+ (subscriber != null ? subscriber.getClass() : null), cause);
}
if (sendSubscriberExceptionEvent) {
SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event,
subscription.subscriber);
subscription.getSubscriber());
post(exEvent);
}
}
Expand All @@ -546,7 +593,7 @@ final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
ISubscription subscription;
Object event;
boolean canceled;
}
Expand Down
2 changes: 1 addition & 1 deletion EventBus/src/org/greenrobot/eventbus/HandlerPoster.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHan
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
public void enqueue(ISubscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
Expand Down
12 changes: 12 additions & 0 deletions EventBus/src/org/greenrobot/eventbus/ISubscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.greenrobot.eventbus;

public interface ISubscription {

public Object getSubscriber();

public SubscriberMethod getSubscriberMethod();

public boolean isActive();

public void setIsActive(boolean isActive);
}
6 changes: 3 additions & 3 deletions EventBus/src/org/greenrobot/eventbus/PendingPost.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event;
Subscription subscription;
ISubscription subscription;
PendingPost next;

private PendingPost(Object event, Subscription subscription) {
private PendingPost(Object event, ISubscription subscription) {
this.event = event;
this.subscription = subscription;
}

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
static PendingPost obtainPendingPost(ISubscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
Expand Down
2 changes: 1 addition & 1 deletion EventBus/src/org/greenrobot/eventbus/Poster.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ interface Poster {
* @param subscription Subscription which will receive the event.
* @param event Event that will be posted to subscribers.
*/
void enqueue(Subscription subscription, Object event);
void enqueue(ISubscription subscription, Object event);
}
36 changes: 28 additions & 8 deletions EventBus/src/org/greenrobot/eventbus/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,34 @@
*/
package org.greenrobot.eventbus;

final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
final class Subscription implements ISubscription {
private final Object subscriber;
private final SubscriberMethod subscriberMethod;
/**
* Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
* {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
*/
volatile boolean active;
private volatile boolean active;

@Override
public Object getSubscriber() {
return subscriber;
}

@Override
public SubscriberMethod getSubscriberMethod() {
return subscriberMethod;
}

@Override
public boolean isActive() {
return active;
}

@Override
public void setIsActive(boolean isActive) {
this.active = isActive;
}

Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
this.subscriber = subscriber;
Expand All @@ -32,10 +52,10 @@ final class Subscription {

@Override
public boolean equals(Object other) {
if (other instanceof Subscription) {
Subscription otherSubscription = (Subscription) other;
return subscriber == otherSubscription.subscriber
&& subscriberMethod.equals(otherSubscription.subscriberMethod);
if (other instanceof ISubscription) {
ISubscription otherSubscription = (ISubscription) other;
return subscriber == otherSubscription.getSubscriber()
&& subscriberMethod.equals(otherSubscription.getSubscriberMethod());
} else {
return false;
}
Expand Down
Loading