View Javadoc

1   package de.mxro.service.internal;
2   
3   import java.util.ArrayList;
4   import java.util.IdentityHashMap;
5   import java.util.LinkedList;
6   import java.util.List;
7   
8   import de.mxro.fn.Success;
9   import de.mxro.service.SafeCast;
10  import de.mxro.service.Service;
11  import de.mxro.service.ServiceRegistry;
12  import de.mxro.service.callbacks.GetServiceCallback;
13  import de.mxro.service.callbacks.ShutdownCallback;
14  import delight.async.Operation;
15  import delight.async.callbacks.SimpleCallback;
16  import delight.async.callbacks.ValueCallback;
17  
18  public class ServiceRegistryImpl implements ServiceRegistry {
19  
20      private final boolean ENABLE_LOG = false;
21  
22      private final List<Service> services;
23      private final IdentityHashMap<Service, Integer> subscribed;
24      private final IdentityHashMap<Service, List<InitializationEntry>> initializing;
25      private final IdentityHashMap<Service, List<DeinitializationEntry>> deinitializing;
26  
27      private final class DeinitializationEntry {
28          public ShutdownCallback callback;
29      }
30  
31      private final class InitializationEntry {
32  
33          public GetServiceCallback<Object> callback;
34      }
35  
36      @Override
37      public void register(final Service service) {
38          synchronized (services) {
39              services.add(service);
40          }
41      }
42  
43      @Override
44      public <InterfaceType> Operation<InterfaceType> subscribe(final Class<InterfaceType> clazz) {
45          return new Operation<InterfaceType>() {
46  
47              @Override
48              public void apply(final ValueCallback<InterfaceType> callback) {
49                  subscribe(clazz, callback);
50              }
51          };
52      }
53  
54      @SuppressWarnings("unchecked")
55      @Override
56      public <InterfaceType> void subscribe(final Class<InterfaceType> clazz, final ValueCallback<InterfaceType> callback) {
57  
58          if (ENABLE_LOG) {
59              System.out.println(this + ": Subscribing for " + clazz);
60          }
61  
62          ArrayList<Service> servicesCopy;
63          synchronized (services) {
64              servicesCopy = new ArrayList<Service>(services);
65          }
66  
67          for (final Service service : servicesCopy) {
68  
69              if (clazz.equals(service.getClass())
70                      || (service instanceof SafeCast && ((SafeCast) service).supports(clazz))) {
71  
72                  final Integer subscribers;
73                  synchronized (subscribed) {
74                      subscribers = subscribed.get(service);
75  
76                      if (subscribers != null && subscribers > 0) {
77                          if (ENABLE_LOG) {
78                              System.out.println(this + ": Subscribed service " + service + " for the " + subscribers + 1
79                                      + " time.");
80                          }
81                          synchronized (subscribed) {
82                              subscribed.put(service, subscribers + 1);
83                          }
84  
85                      }
86  
87                  }
88  
89                  if (subscribers != null && subscribers > 0) {
90                      callback.onSuccess((InterfaceType) service);
91                      return;
92                  }
93  
94                  synchronized (initializing) {
95                      if (initializing.containsKey(service)) {
96                          final InitializationEntry e = new InitializationEntry();
97                          e.callback = (GetServiceCallback<Object>) callback;
98                          initializing.get(service).add(e);
99                          return;
100                     }
101 
102                     initializing.put(service, new LinkedList<InitializationEntry>());
103                 }
104 
105                 synchronized (deinitializing) {
106                     if (deinitializing.containsKey(service)) {
107                         final DeinitializationEntry e = new DeinitializationEntry();
108                         e.callback = new ShutdownCallback() {
109 
110                             @Override
111                             public void onSuccess() {
112                                 subscribe(clazz, callback);
113                             }
114 
115                             @Override
116                             public void onFailure(final Throwable t) {
117                                 callback.onFailure(new Exception("Error during pending deinitialization.", t));
118                             }
119                         };
120                         deinitializing.get(service).add(e);
121                         return;
122                     }
123                 }
124 
125                 service.start(new SimpleCallback() {
126 
127                     @Override
128                     public void onSuccess() {
129 
130                         final Integer subscribers;
131                         synchronized (subscribed) {
132                             subscribers = subscribed.get(service);
133                             if (subscribers == null) {
134                                 subscribed.put(service, 1);
135                             } else {
136                                 subscribed.put(service, subscribers + 1);
137                             }
138                         }
139 
140                         synchronized (initializing) {
141 
142                             final List<InitializationEntry> entries = initializing.get(service);
143                             for (final InitializationEntry e : entries) {
144                                 e.callback.onSuccess(service);
145                                 return;
146                             }
147                             initializing.remove(service);
148                         }
149 
150                         if (ENABLE_LOG) {
151 
152                             System.out.println(this + ": Subscribed service " + service);
153                         }
154 
155                         callback.onSuccess((InterfaceType) service);
156 
157                     }
158 
159                     @Override
160                     public void onFailure(final Throwable t) {
161                         callback.onFailure(t);
162                     }
163                 });
164 
165                 return;
166             }
167 
168         }
169         throw new RuntimeException("No service in registry which supports interface " + clazz);
170 
171     }
172 
173     @Override
174     public Operation<Success> unsubscribe(final Object service) {
175         return new Operation<Success>() {
176 
177             @Override
178             public void apply(final ValueCallback<Success> callback) {
179                 unsubscribe(service, new SimpleCallback() {
180 
181                     @Override
182                     public void onFailure(final Throwable t) {
183                         callback.onFailure(t);
184                     }
185 
186                     @Override
187                     public void onSuccess() {
188                         callback.onSuccess(Success.INSTANCE);
189                     }
190                 });
191             }
192         };
193     }
194 
195     @Override
196     public void unsubscribe(final Object service_raw, final SimpleCallback callback) {
197         final Service service = (Service) service_raw;
198 
199         if (ENABLE_LOG) {
200 
201             System.out.println(this + ": Unsubscribe service " + service_raw);
202         }
203 
204         synchronized (subscribed) {
205             final Integer subscribers = subscribed.get(service);
206 
207             if (subscribers == null || subscribers == 0) {
208                 throw new IllegalArgumentException("Trying to unsubscribe service that has not been subscribed to.");
209             }
210 
211             if (subscribers == 1) {
212                 subscribed.remove(service);
213 
214                 synchronized (deinitializing) {
215 
216                     assert !deinitializing.containsKey(service);
217 
218                     deinitializing.put(service, new LinkedList<ServiceRegistryImpl.DeinitializationEntry>());
219 
220                     service.stop(new ShutdownCallback() {
221 
222                         @Override
223                         public void onSuccess() {
224                             synchronized (deinitializing) {
225 
226                                 for (final DeinitializationEntry e : deinitializing.get(service)) {
227                                     e.callback.onSuccess();
228                                 }
229 
230                                 deinitializing.remove(service);
231 
232                             }
233 
234                             callback.onSuccess();
235                         }
236 
237                         @Override
238                         public void onFailure(final Throwable t) {
239                             callback.onFailure(t);
240                         }
241                     });
242 
243                 }
244                 return;
245 
246             }
247 
248             subscribed.put(service, subscribers - 1);
249         }
250         callback.onSuccess();
251 
252     }
253 
254     public ServiceRegistryImpl() {
255         super();
256         this.services = new ArrayList<Service>();
257         this.subscribed = new IdentityHashMap<Service, Integer>();
258         this.initializing = new IdentityHashMap<Service, List<InitializationEntry>>();
259         this.deinitializing = new IdentityHashMap<Service, List<DeinitializationEntry>>();
260     }
261 
262 }