1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package net.jini.lookup;
20
21 import java.io.IOException;
22 import java.rmi.RemoteException;
23 import java.rmi.server.ExportException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.FutureTask;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.PriorityBlockingQueue;
41 import java.util.concurrent.RunnableFuture;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicLong;
47 import java.util.concurrent.locks.ReentrantReadWriteLock;
48 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
49 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
50 import java.util.function.BiConsumer;
51 import java.util.function.BiFunction;
52 import java.util.logging.Level;
53 import net.jini.config.ConfigurationException;
54 import net.jini.core.constraint.MethodConstraints;
55 import net.jini.core.constraint.RemoteMethodControl;
56 import net.jini.core.entry.Entry;
57 import net.jini.core.event.RemoteEvent;
58 import net.jini.core.event.RemoteEventListener;
59 import net.jini.core.event.UnknownEventException;
60 import net.jini.core.lookup.ServiceEvent;
61 import net.jini.core.lookup.ServiceID;
62 import net.jini.core.lookup.ServiceItem;
63 import net.jini.core.lookup.ServiceMatches;
64 import net.jini.core.lookup.ServiceRegistrar;
65 import net.jini.core.lookup.ServiceTemplate;
66 import net.jini.export.Exporter;
67 import net.jini.io.MarshalledInstance;
68 import net.jini.jeri.AtomicILFactory;
69 import net.jini.jeri.BasicJeriExporter;
70 import net.jini.jeri.tcp.TcpServerEndpoint;
71 import net.jini.security.TrustVerifier;
72 import net.jini.security.proxytrust.ServerProxyTrust;
73 import org.apache.river.concurrent.RC;
74 import org.apache.river.concurrent.Ref;
75 import org.apache.river.concurrent.Referrer;
76 import org.apache.river.lookup.entry.LookupAttributes;
77 import org.apache.river.proxy.BasicProxyTrustVerifier;
78 import org.apache.river.thread.DependencyLinker;
79 import org.apache.river.thread.ExtensibleExecutorService;
80 import org.apache.river.thread.FutureObserver;
81 import org.apache.river.thread.NamedThreadFactory;
82 import org.apache.river.thread.ObservableFutureTask;
83
84
85
86
87
88
89 final class LookupCacheImpl implements LookupCache {
90
91 private static final int ITEM_ADDED = 0;
92 private static final int ITEM_REMOVED = 2;
93 private static final int ITEM_CHANGED = 3;
94
95 private final LookupListener lookupListener;
96
97 private volatile Exporter lookupListenerExporter;
98
99 private volatile RemoteEventListener lookupListenerProxy;
100
101
102
103 private volatile ExecutorService cacheTaskMgr;
104 private volatile CacheTaskDependencyManager cacheTaskDepMgr;
105
106 private volatile ExecutorService incomingEventExecutor;
107
108 private volatile boolean bCacheTerminated = false;
109
110 private final ReadLock sItemListenersRead;
111 private final WriteLock sItemListenersWrite;
112 private final Collection<ServiceDiscoveryListener> sItemListeners;
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 private final ConcurrentMap<ServiceID, ServiceItemReg> serviceIdMap;
135
136
137
138 private final ConcurrentMap<ProxyReg, EventReg> eventRegMap;
139
140 private final ServiceTemplate tmpl;
141
142 private final ServiceItemFilter filter;
143
144 private final long leaseDuration;
145
146
147
148 private final long startTime;
149
150
151
152 private volatile ScheduledExecutorService serviceDiscardTimerTaskMgr;
153 private final ConcurrentMap<ServiceID, Future> serviceDiscardFutures;
154
155
156
157
158
159
160
161 private final AtomicLong taskSeqN;
162 private final ServiceDiscoveryManager sdm;
163 private final boolean useInsecureLookup;
164
165 LookupCacheImpl(ServiceTemplate tmpl, ServiceItemFilter filter,
166 ServiceDiscoveryListener sListener, long leaseDuration,
167 ServiceDiscoveryManager sdm, boolean useInsecureLookup)
168 throws RemoteException
169 {
170 this.useInsecureLookup = useInsecureLookup;
171 this.taskSeqN = new AtomicLong();
172 this.startTime = System.currentTimeMillis();
173 this.eventRegMap = new ConcurrentHashMap<ProxyReg, EventReg>();
174 this.serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItemReg>();
175 this.sItemListeners = new HashSet<ServiceDiscoveryListener>();
176 this.serviceDiscardFutures = RC.concurrentMap(new ConcurrentHashMap<Referrer<ServiceID>, Referrer<Future>>(), Ref.WEAK_IDENTITY, Ref.STRONG, 60000, 60000);
177 this.tmpl = tmpl.clone();
178 this.leaseDuration = leaseDuration;
179 this.filter = filter;
180 lookupListener = new LookupListener();
181 if (sListener != null) {
182 sItemListeners.add(sListener);
183 }
184 this.sdm = sdm;
185 ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
186 sItemListenersRead = rwl.readLock();
187 sItemListenersWrite = rwl.writeLock();
188 }
189
190 private ExecutorService eventNotificationExecutor;
191
192
193
194
195
196
197
198
199
200 private final class LookupListener implements RemoteEventListener,
201 ServerProxyTrust {
202
203 RemoteEventListener export() throws ExportException {
204 return (RemoteEventListener) lookupListenerExporter.export(this);
205 }
206
207 @Override
208 public void notify(RemoteEvent evt) throws UnknownEventException,
209 java.rmi.RemoteException {
210 if (!(evt instanceof ServiceEvent)) {
211 throw new UnknownEventException("ServiceEvent required,not: " + evt.toString());
212 }
213 notifyServiceMap((ServiceEvent) evt);
214 }
215
216
217
218
219
220 @Override
221 public TrustVerifier getProxyVerifier() {
222 return new BasicProxyTrustVerifier(lookupListenerProxy);
223 }
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249 private static final class RegisterListenerTask extends
250 CacheTask {
251
252 final LookupCacheImpl cache;
253
254 public RegisterListenerTask(ProxyReg reg,
255 long seqN, LookupCacheImpl cache) {
256 super(reg, seqN);
257 this.cache = cache;
258 }
259
260 @Override
261 public boolean hasDeps() {
262 return true;
263 }
264
265 @Override
266 public boolean dependsOn(CacheTask t) {
267 if (t instanceof ProxyRegDropTask) {
268 ProxyReg r = getProxyReg();
269 if (r != null && r.equals(t.getProxyReg())) {
270 if (t.getSeqN() < getSeqN()) {
271 return true;
272 }
273 }
274 }
275 return false;
276 }
277
278 @Override
279 public void run() {
280 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
281 ServiceDiscoveryManager.log(Level.FINER,
282 "ServiceDiscoveryManager - RegisterListenerTask started");
283 }
284 long duration = cache.getLeaseDuration();
285 if (duration < 0) {
286 return;
287 }
288 try {
289 EventReg eventReg
290 = cache.sdm.registerListener(
291 reg.getProxy(),
292 cache.tmpl,
293 cache.lookupListenerProxy,
294 duration
295 );
296
297
298
299
300 if (cache.bCacheTerminated
301 || Thread.currentThread().isInterrupted()) {
302
303 cache.sdm.cancelLease(eventReg.lease);
304 } else {
305 eventReg.suspendEvents();
306 EventReg existed
307 = cache.eventRegMap.putIfAbsent(reg, eventReg);
308 if (existed != null){
309 cache.sdm.cancelLease(eventReg.lease);
310 } else {
311 try {
312
313 cache.lookup(reg);
314 } finally {
315 synchronized (eventReg){
316 eventReg.releaseEvents();
317 eventReg.notify();
318 }
319 }
320 }
321
322 }
323 } catch (Exception e) {
324 cache.sdm.fail(e,
325 reg.getProxy(),
326 this.getClass().getName(),
327 "run",
328 "Exception occurred while attempting to register with the lookup service event mechanism",
329 cache.bCacheTerminated
330 );
331 } finally {
332 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
333 ServiceDiscoveryManager.log(Level.FINER,
334 "ServiceDiscoveryManager - RegisterListenerTask completed");
335 }
336 }
337 }
338 }
339
340
341
342
343
344 private static final class ProxyRegDropTask extends CacheTask {
345
346 final LookupCacheImpl cache;
347 final EventReg eReg;
348
349 public ProxyRegDropTask(ProxyReg reg,
350 EventReg eReg,
351 long seqN,
352 LookupCacheImpl cache) {
353 super(reg, seqN);
354 this.cache = cache;
355 this.eReg = eReg;
356 }
357
358 @Override
359 public void run() {
360 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
361 ServiceDiscoveryManager.log(Level.FINEST,
362 "ServiceDiscoveryManager - ProxyRegDropTask started");
363 }
364
365
366
367 synchronized (eReg){
368 while (eReg.eventsSuspended()) {
369
370
371 try {
372 eReg.wait(200L);
373 } catch (InterruptedException e){
374 Thread.currentThread().interrupt();
375 }
376 }
377
378
379
380
381 if (eReg.discard()) cache.eventRegMap.remove(reg, eReg);
382 }
383
384
385
386
387
388
389 Iterator<Map.Entry<ServiceID, ServiceItemReg>> iter = cache.serviceIdMap.entrySet().iterator();
390 while (iter.hasNext()) {
391 Map.Entry<ServiceID, ServiceItemReg> e = iter.next();
392 ServiceID srvcID = e.getKey();
393 DissociateLusCleanUpOrphan dlcl = new DissociateLusCleanUpOrphan(cache, reg.getProxy());
394 cache.serviceIdMap.computeIfPresent(srvcID, dlcl);
395 if (dlcl.itemRegProxy != null) {
396 cache.itemMatchMatchChange(srvcID, dlcl.itmReg, dlcl.itemRegProxy, dlcl.newItem, false);
397 } else if (dlcl.notify && dlcl.filteredItem != null) {
398 cache.removeServiceNotify(dlcl.filteredItem);
399 }
400 }
401 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
402 ServiceDiscoveryManager.log(Level.FINEST,
403 "ServiceDiscoveryManager - ProxyRegDropTask completed");
404 }
405 }
406
407 @Override
408 public boolean hasDeps() {
409 return true;
410 }
411
412 @Override
413 public boolean dependsOn(CacheTask t) {
414 if (t instanceof RegisterListenerTask || t instanceof ProxyRegDropTask) {
415 ProxyReg r = getProxyReg();
416 if (r != null && r.equals(t.getProxyReg())) {
417 if (t.getSeqN() < getSeqN()) {
418 return true;
419 }
420 }
421 }
422 return false;
423 }
424 }
425
426
427
428
429
430
431
432 private static final class ServiceDiscardTimerTask implements Runnable {
433
434 private final ServiceID serviceID;
435 private final LookupCacheImpl cache;
436
437 public ServiceDiscardTimerTask(LookupCacheImpl cache, ServiceID serviceID) {
438 this.serviceID = serviceID;
439 this.cache = cache;
440 }
441
442 @Override
443 public void run() {
444 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
445 ServiceDiscoveryManager.log(Level.FINEST,
446 "ServiceDiscoveryManager - ServiceDiscardTimerTask started");
447 }
448 try {
449
450 if (cache.bCacheTerminated) {
451 return;
452 }
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473 ServiceItemReg itemReg = cache.serviceIdMap.get(serviceID);
474 if (itemReg != null) {
475
476
477
478
479
480 ServiceItem itemToSend;
481 if (!itemReg.isDiscarded()) return;
482 ServiceItem item = null;
483 ServiceItem filteredItem = null;
484 boolean addFilteredItemToMap = false;
485 boolean remove = false;
486 boolean notify = true;
487 itemToSend = itemReg.getFilteredItem();
488 if (itemToSend == null) {
489 item = itemReg.getItem();
490 filteredItem = item.clone();
491
492 if (cache.useInsecureLookup){
493 if (ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)) {
494 addFilteredItemToMap = true;
495 } else {
496
497 remove = true;
498 notify = false;
499 }
500 } else {
501
502
503 try {
504 if(ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)){
505 addFilteredItemToMap = true;
506 } else {
507
508 remove = true;
509 notify = false;
510 }
511 } catch (SecurityException ex){
512 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
513 ServiceDiscoveryManager.log(Level.FINE,
514 "Exception caught, while attempting to filter a bootstrap proxy", ex);
515 }
516 try {
517 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
518 if(ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)){
519 addFilteredItemToMap = true;
520 } else {
521
522 remove = true;
523 notify = false;
524 }
525 } catch (RemoteException ex1) {
526 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
527 ServiceDiscoveryManager.log(Level.FINE,
528 "Exception caught, while attempting to filter a bootstrap proxy", ex1);
529 }
530
531 remove = true;
532 notify = false;
533 }
534 } catch (ClassCastException ex){
535 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
536 ServiceDiscoveryManager.log(Level.FINE,
537 "Exception caught, while attempting to filter a bootstrap proxy", ex);
538 }
539 try {
540 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
541 if(ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)){
542 addFilteredItemToMap = true;
543 } else {
544
545 remove = true;
546 notify = false;
547 }
548 } catch (RemoteException ex1) {
549 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
550 ServiceDiscoveryManager.log(Level.FINE,
551 "Exception caught, while attempting to filter a bootstrap proxy", ex1);
552 }
553
554 remove = true;
555 notify = false;
556 }
557 }
558 }
559 }
560
561
562
563
564
565
566
567
568
569 AddOrRemove aor =
570 new AddOrRemove(cache, item, filteredItem,
571 itemToSend, addFilteredItemToMap,
572 remove, notify
573 );
574 cache.serviceIdMap.computeIfPresent(serviceID, aor);
575 if (aor.notify) cache.addServiceNotify(aor.itemToSend);
576 }
577 } finally {
578 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
579 ServiceDiscoveryManager.log(Level.FINEST,
580 "ServiceDiscoveryManager - ServiceDiscardTimerTask completed");
581 }
582 }
583 }
584 }
585
586
587
588
589 private static class AddOrRemove
590 implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg>
591 {
592 final LookupCacheImpl cache;
593 final ServiceItem item;
594 final ServiceItem filteredItem;
595 final boolean addFilteredItemToMap;
596 final boolean remove;
597 boolean notify;
598 ServiceItem itemToSend;
599
600 AddOrRemove(LookupCacheImpl cache, ServiceItem item,
601 ServiceItem filteredItem, ServiceItem itemToSend,
602 boolean addFilteredItemToMap, boolean remove, boolean notify)
603 {
604 this.cache = cache;
605 this.item = item;
606 this.filteredItem = filteredItem;
607 this.itemToSend = itemToSend;
608 this.addFilteredItemToMap = addFilteredItemToMap;
609 this.remove = remove;
610 this.notify = notify;
611 }
612
613 @Override
614 public ServiceItemReg apply(ServiceID serviceID, ServiceItemReg itemReg) {
615 if (!itemReg.unDiscard()){
616 notify = false;
617 return itemReg;
618 }
619
620
621
622
623
624
625
626
627
628 if (addFilteredItemToMap){
629 itemReg.replaceProxyUsedToTrackChange(null, item);
630 itemReg.setFilteredItem(filteredItem);
631 itemToSend = filteredItem;
632 return itemReg;
633 } else if (remove){
634 return null;
635 }
636 return itemReg;
637 }
638
639 }
640
641
642 @Override
643 public void terminate() {
644 synchronized (this) {
645 if (bCacheTerminated) {
646 return;
647 }
648 bCacheTerminated = true;
649 }
650 sdm.removeLookupCache(this);
651
652 cacheTaskMgr.shutdownNow();
653
654 serviceDiscardTimerTaskMgr.shutdownNow();
655 eventNotificationExecutor.shutdownNow();
656
657 Set set = eventRegMap.entrySet();
658 Iterator iter = set.iterator();
659 while (iter.hasNext()) {
660 Map.Entry e = (Map.Entry) iter.next();
661 EventReg eReg = (EventReg) e.getValue();
662 sdm.cancelLease(eReg.lease);
663 }
664
665 try {
666 lookupListenerExporter.unexport(true);
667 } catch (IllegalStateException e) {
668 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
669 ServiceDiscoveryManager.log(
670 Level.FINEST,
671 "IllegalStateException occurred while unexporting the cache's remote event listener",
672 e
673 );
674 }
675 }
676 incomingEventExecutor.shutdownNow();
677 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
678 ServiceDiscoveryManager.log(
679 Level.FINEST,
680 "ServiceDiscoveryManager - LookupCache terminated"
681 );
682 }
683 }
684
685
686 @Override
687 public ServiceItem lookup(ServiceItemFilter myFilter) {
688 checkCacheTerminated();
689 ServiceItem[] ret = getServiceItems(myFilter);
690 if (ret.length == 0) {
691 return null;
692 }
693
694
695 int rand = sdm.random.nextInt(ret.length);
696 return ret[rand];
697 }
698
699
700 @Override
701 public ServiceItem[] lookup(ServiceItemFilter myFilter, int maxMatches) {
702 checkCacheTerminated();
703 if (maxMatches < 1) {
704 throw new IllegalArgumentException("maxMatches must be > 0");
705 }
706 ServiceItem[] sa = getServiceItems(myFilter);
707 int len = sa.length;
708 if (len == 0 || len <= maxMatches) return sa;
709 List<ServiceItem> items = new LinkedList<ServiceItem>();
710 int rand = sdm.random.nextInt(Integer.MAX_VALUE) % len;
711 for (int i = 0; i < len; i++) {
712 items.add(sa[(i + rand) % len]);
713 if (items.size() == maxMatches) {
714 break;
715 }
716 }
717 ServiceItem[] ret = new ServiceItem[items.size()];
718 items.toArray(ret);
719 return ret;
720 }
721
722
723 @Override
724 public void discard(Object serviceReference) {
725 checkCacheTerminated();
726
727
728
729
730
731 Iterator<Map.Entry<ServiceID, ServiceItemReg>> iter = serviceIdMap.entrySet().iterator();
732 while (iter.hasNext()) {
733 Map.Entry<ServiceID, ServiceItemReg> e = iter.next();
734 ServiceItemReg itmReg = e.getValue();
735 ServiceID sid = e.getKey();
736 ServiceItem filteredItem = itmReg.getFilteredItem();
737 if (filteredItem != null && (filteredItem.service).equals(serviceReference)) {
738 Discard dis = new Discard(this, itmReg, filteredItem, sdm.getDiscardWait());
739 serviceIdMap.computeIfPresent(sid, dis);
740 }
741 }
742 }
743
744 private static class Discard
745 implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg>{
746
747 LookupCacheImpl cache;
748 ServiceItemReg expected;
749 ServiceItem filteredItem;
750 long discardWait;
751
752 Discard(LookupCacheImpl cache, ServiceItemReg itmReg, ServiceItem filteredItem, long discardWait){
753 this.cache = cache;
754 this.expected = itmReg;
755 this.filteredItem = filteredItem;
756 this.discardWait = discardWait;
757 }
758
759 @Override
760 public ServiceItemReg apply(ServiceID sid, ServiceItemReg itmReg) {
761 if (!expected.equals(itmReg)) return itmReg;
762 if (itmReg.discard()) {
763 Future f =
764 cache.serviceDiscardTimerTaskMgr.schedule(
765 new ServiceDiscardTimerTask(cache, sid),
766 discardWait,
767 TimeUnit.MILLISECONDS
768 );
769 cache.serviceDiscardFutures.put(sid, f);
770 cache.removeServiceNotify(filteredItem);
771 }
772 return itmReg;
773 }
774 }
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799 private ServiceItem[] getServiceItems(ServiceItemFilter filter2) {
800 FilteredItems items = new FilteredItems(this, filter2);
801 serviceIdMap.forEach(items);
802 return items.result();
803 }
804
805 private static class FilteredItems implements BiConsumer<ServiceID, ServiceItemReg> {
806
807 private final List<ServiceItem> items;
808 private final ServiceItemFilter filter2;
809 private final LookupCacheImpl cache;
810
811 FilteredItems(LookupCacheImpl cache, ServiceItemFilter filter2){
812 this.items = new LinkedList<ServiceItem>();
813 this.filter2 = filter2;
814 this.cache = cache;
815 }
816
817 @Override
818 public void accept(ServiceID sid, ServiceItemReg itemReg) {
819 ServiceItem itemToFilter = itemReg.getFilteredItem();
820 if ((itemToFilter == null) || (itemReg.isDiscarded())) return;
821
822
823
824
825 itemToFilter = itemToFilter.clone();
826
827 boolean pass = (filter2 == null) || (filter2.check(itemToFilter));
828
829 if (!pass) return;
830
831 if (itemToFilter.service != null) {
832 items.add(itemToFilter);
833 return;
834 }
835
836 Discard dis =
837 new Discard(
838 cache,
839 itemReg,
840 itemToFilter,
841 cache.sdm.getDiscardWait()
842 );
843 cache.serviceIdMap.computeIfPresent(sid, dis);
844 }
845
846 ServiceItem[] result(){
847 ServiceItem[] ret = new ServiceItem[items.size()];
848 items.toArray(ret);
849 return ret;
850 }
851
852 }
853
854 ServiceItem [] processBootStrapProxys(Object [] proxys){
855 int length = proxys.length;
856 Collection<ServiceItem> result = new ArrayList<ServiceItem>(length);
857 for (int i = 0; i < length; i++){
858 Object bootstrap;
859 Entry [] attributes;
860 ServiceID id;
861 try {
862 bootstrap = sdm.bootstrapProxyPreparer.prepareProxy(proxys[i]);
863 attributes = ((ServiceAttributesAccessor) bootstrap).getServiceAttributes();
864 id = ((ServiceIDAccessor) bootstrap).serviceID();
865 result.add(new ServiceItem(id, bootstrap, attributes));
866 } catch (IOException ex) { }
867 }
868 return result.toArray(new ServiceItem[result.size()]);
869 }
870
871
872 @Override
873 public void addListener(ServiceDiscoveryListener listener) {
874 checkCacheTerminated();
875 if (listener == null) {
876 throw new NullPointerException("can't add null listener");
877 }
878
879 ServiceItem[] items = getServiceItems(null);
880 boolean added;
881 sItemListenersWrite.lock();
882 try {
883 added = sItemListeners.add(listener);
884 } finally {
885 sItemListenersWrite.unlock();
886 }
887 if (added){
888 for (int i = 0, l = items.length; i < l; i++) {
889 addServiceNotify(items[i], listener);
890 }
891 }
892 }
893
894
895 @Override
896 public void removeListener(ServiceDiscoveryListener listener) {
897 checkCacheTerminated();
898 if (listener == null) {
899 return;
900 }
901 sItemListenersWrite.lock();
902 try {
903 sItemListeners.remove(listener);
904 } finally {
905 sItemListenersWrite.unlock();
906 }
907 }
908
909
910
911
912
913
914
915 public void addProxyReg(ProxyReg reg) {
916 RegisterListenerTask treg = new RegisterListenerTask(reg, taskSeqN.getAndIncrement(), this);
917 cacheTaskDepMgr.submit(treg);
918 }
919
920
921
922
923
924
925
926 public void removeProxyReg(ProxyReg reg) {
927 ProxyRegDropTask t;
928
929 EventReg eReg = eventRegMap.get(reg);
930 if (eReg != null) {
931 try {
932 sdm.leaseRenewalMgr.remove(eReg.lease);
933 } catch (Exception e) {
934 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
935 ServiceDiscoveryManager.log(
936 Level.FINER,
937 "exception occurred while removing an event registration lease",
938 e
939 );
940 }
941 }
942 t = new ProxyRegDropTask(reg, eReg, taskSeqN.getAndIncrement(), this);
943 cacheTaskDepMgr.removeUselessTask(reg);
944 cacheTaskDepMgr.submit(t);
945 }
946 }
947
948
949
950
951 private void checkCacheTerminated() {
952 sdm.checkTerminated();
953 if (bCacheTerminated) {
954 throw new IllegalStateException("this lookup cache was terminated");
955 }
956 }
957
958
959
960
961
962
963 private void notifyServiceMap(ServiceEvent theEvent){
964 if (theEvent.getSource() == null) {
965 return;
966 }
967 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
968 ServiceDiscoveryManager.log(
969 Level.FINE,
970 "HandleServiceEventTask submitted"
971 );
972 }
973 incomingEventExecutor.submit(new HandleServiceEventTask(this, theEvent));
974 }
975
976
977
978
979
980
981
982
983
984
985 private static class HandleServiceEventTask implements Runnable, Comparable {
986
987 private final LookupCacheImpl cache;
988 private final ServiceEvent theEvent;
989
990
991 private volatile ProxyReg reg;
992 private volatile EventReg eReg;
993 private volatile long timestamp;
994 private volatile ServiceItem item;
995
996 HandleServiceEventTask(LookupCacheImpl cache, ServiceEvent event){
997 this.cache = cache;
998 this.theEvent = event;
999 }
1000
1001 @Override
1002 public void run() {
1003 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1004 ServiceDiscoveryManager.log(Level.FINER,"HandleServiceEventTask started");
1005 }
1006 try {
1007 if (item == null){
1008 if (cache.useInsecureLookup){
1009 item = theEvent.getServiceItem();
1010 } else {
1011
1012
1013
1014
1015
1016 Object proxy = theEvent.getBootstrapProxy();
1017 if (proxy != null){
1018 Entry [] attributes;
1019 try {
1020 proxy = cache.sdm.bootstrapProxyPreparer.prepareProxy(proxy);
1021
1022
1023
1024
1025
1026
1027 attributes = ((ServiceAttributesAccessor)proxy).getServiceAttributes();
1028 item = new ServiceItem(theEvent.getServiceID(), proxy, attributes);
1029 } catch (IOException ex) {
1030 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1031 ServiceDiscoveryManager.log(
1032 Level.FINE,
1033 "exception thrown while attempting to establish contact via a bootstrap proxy",
1034 ex
1035 );
1036 }
1037
1038 }
1039 }
1040 }
1041 }
1042
1043 FIND_ProxyReg: while (reg == null || eReg == null) {
1044 Set<Map.Entry<ProxyReg, EventReg>> set = cache.eventRegMap.entrySet();
1045 Iterator<Map.Entry<ProxyReg, EventReg>> iter = set.iterator();
1046 while (iter.hasNext()) {
1047 Map.Entry<ProxyReg, EventReg> e = iter.next();
1048 eReg = e.getValue();
1049 if (theEvent.getID() == eReg.eventID && theEvent.getSource().equals(eReg.source)) {
1050 reg = e.getKey();
1051 break FIND_ProxyReg;
1052 }
1053 }
1054 try {
1055 cache.incomingEventExecutor.submit(this);
1056 Thread.sleep(50L);
1057 return;
1058 } catch (InterruptedException ex) {
1059 Thread.currentThread().interrupt();
1060 return;
1061 }
1062 }
1063 if (timestamp == 0){
1064 timestamp = System.currentTimeMillis();
1065 } else {
1066
1067 if (!cache.eventRegMap.containsKey(reg)) return;
1068 }
1069 long currentTime = System.currentTimeMillis();
1070 long delta = 0;
1071 boolean resubmit = false;
1072 int waiting = 0;
1073 synchronized (eReg){
1074 if (eReg.discarded()) return;
1075 if (eReg.nonContiguousEvent(theEvent.getSequenceNumber())
1076 && (currentTime - timestamp < 500))
1077 {
1078 resubmit = true;
1079 eReg.notifyAll();
1080 } else {
1081 while (eReg.eventsSuspended()){
1082 try {
1083 waiting ++;
1084 eReg.wait(100L);
1085 waiting --;
1086 if (eReg.discarded()) return;
1087
1088 if (waiting > 0 && eReg.nonContiguousEvent(
1089 theEvent.getSequenceNumber()))
1090 {
1091 eReg.notifyAll();
1092 resubmit = true;
1093 break;
1094 }
1095 } catch (InterruptedException ex) {
1096 Thread.currentThread().interrupt();
1097 return;
1098 }
1099 }
1100 if (!resubmit){
1101 eReg.suspendEvents();
1102 delta = eReg.updateSeqNo(theEvent.getSequenceNumber());
1103 }
1104 }
1105 }
1106 if (resubmit){
1107 try {
1108 cache.incomingEventExecutor.submit(this);
1109 Thread.sleep(50L);
1110 } catch (InterruptedException ex) {
1111 Thread.currentThread().interrupt();
1112 return;
1113 }
1114 return;
1115 }
1116 try {
1117 cache.notifyServiceMap(delta,
1118 theEvent.getServiceID(),
1119 item,
1120 theEvent.getTransition(),
1121 reg);
1122 } finally {
1123 synchronized (eReg){
1124 eReg.releaseEvents();
1125 eReg.notifyAll();
1126 }
1127 }
1128 } catch (RuntimeException e){
1129 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1130 ServiceDiscoveryManager.log(Level.FINER, "HandleServiceEventTask threw a RuntimeException", e);
1131 } finally {
1132 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1133 ServiceDiscoveryManager.log(Level.FINER, "HandleServiceEventTask completed");
1134 }
1135 }
1136
1137 @Override
1138 public int compareTo(Object o) {
1139 if (!(o instanceof HandleServiceEventTask)) return 0;
1140 HandleServiceEventTask that = (HandleServiceEventTask) o;
1141 long dif = this.theEvent.getSequenceNumber() - that.theEvent.getSequenceNumber();
1142 if (dif == 0) return 0;
1143 if (dif < 0) return -1;
1144 return 1;
1145 }
1146 }
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184 private void notifyServiceMap(long delta,
1185 ServiceID sid,
1186 ServiceItem item,
1187 int transition,
1188 ProxyReg reg)
1189 {
1190
1191 if (delta == 1) {
1192
1193 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1194 ServiceDiscoveryManager.log(
1195 Level.FINE,
1196 "No gap, handle current ServiceEvent, ServiceID: {0} transition: {1}",
1197 new Object[]{sid, transition}
1198 );
1199 }
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211 if ((item != null) && (item.service == null)) {
1212 return;
1213 }
1214
1215
1216
1217
1218 if (transition == ServiceRegistrar.TRANSITION_MATCH_NOMATCH)
1219 {
1220 handleMatchNoMatch(reg.getProxy(), sid);
1221 } else if (transition == ServiceRegistrar.TRANSITION_NOMATCH_MATCH
1222 || transition == ServiceRegistrar.TRANSITION_MATCH_MATCH)
1223 {
1224 newOldService(reg, sid, item, transition == ServiceRegistrar.TRANSITION_MATCH_MATCH);
1225 }
1226 return;
1227 }
1228 if (delta == 0) {
1229 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1230 ServiceDiscoveryManager.log(
1231 Level.FINE,
1232 "Repeat ServiceEvent, ignore, ServiceID: {0} transition: {1}",
1233 new Object[]{sid, transition}
1234 );
1235 }
1236 return;
1237 }
1238 if (delta < 0) {
1239 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1240 ServiceDiscoveryManager.log(
1241 Level.FINE,
1242 "Old ServiceEvent, ignore, ServiceID: {0} transition: {1}",
1243 new Object[]{sid, transition}
1244 );
1245 }
1246 return;
1247 }
1248
1249 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1250 ServiceDiscoveryManager.log(
1251 Level.FINE,
1252 "Gap in ServiceEvent sequence, performing lookup, ServiceID: {0} transition: {1}",
1253 new Object[]{sid, transition}
1254 );
1255 }
1256 lookup(reg);
1257 }
1258
1259
1260
1261
1262
1263 private void lookup(ProxyReg reg) {
1264 ServiceRegistrar proxy = reg.getProxy();
1265 ServiceItem[] items;
1266
1267 try {
1268 if (useInsecureLookup){
1269 ServiceMatches matches = proxy.lookup(tmpl, Integer.MAX_VALUE);
1270 items = matches.items;
1271 } else {
1272 Object [] matches = ((SafeServiceRegistrar) proxy).lookUp(
1273 tmpl, Integer.MAX_VALUE);
1274 items = processBootStrapProxys(matches);
1275 }
1276 } catch (Exception e) {
1277
1278
1279 sdm.fail(e, proxy, this.getClass().getName(), "run", "Exception occurred during call to lookup", bCacheTerminated);
1280 return;
1281 }
1282 if (items == null) {
1283 throw new AssertionError("spec violation in queried lookup service: ServicesMatches instance returned by call to lookup() method contains null 'items' field");
1284 }
1285
1286
1287 Iterator<Map.Entry<ServiceID, ServiceItemReg>> iter = serviceIdMap.entrySet().iterator();
1288 while (iter.hasNext()) {
1289 Map.Entry<ServiceID, ServiceItemReg> e = iter.next();
1290 ServiceID srvcID = e.getKey();
1291 ServiceItem itemInSnapshot = findItem(srvcID, items);
1292 if (itemInSnapshot != null) continue;
1293 if (Thread.currentThread().isInterrupted()) return;
1294 DissociateLusCleanUpOrphan dlcl = new DissociateLusCleanUpOrphan(this, reg.getProxy());
1295 serviceIdMap.computeIfPresent(srvcID, dlcl);
1296 if (dlcl.itemRegProxy != null) {
1297 itemMatchMatchChange(srvcID, dlcl.itmReg, dlcl.itemRegProxy, dlcl.newItem, false);
1298 } else if (dlcl.notify && dlcl.filteredItem != null) {
1299 removeServiceNotify(dlcl.filteredItem);
1300 }
1301 }
1302
1303 for (int i = 0, l = items.length; i < l; i++) {
1304
1305 if (items[i].service == null) {
1306 continue;
1307 }
1308 if (items[i].serviceID == null && !useInsecureLookup){
1309 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
1310 ServiceDiscoveryManager.log(Level.FINE,
1311 "ServiceItem contained null serviceID field, attempting to retrieve again");
1312 try {
1313 ServiceID id = ((ServiceIDAccessor)items[i].service).serviceID();
1314
1315 if (id == null) continue;
1316 items[i].serviceID = id;
1317 } catch ( IOException e){
1318 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
1319 ServiceDiscoveryManager.log(Level.FINE,
1320 "ServiceItem contained null serviceID field, attempt to retrieve again failed, ignoring",
1321 e);
1322 continue;
1323 }
1324 }
1325 newOldService(reg, items[i].serviceID, items[i], false);
1326 }
1327 }
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355 private void newOldService(ProxyReg reg, ServiceID id, ServiceItem item, boolean matchMatchEvent) {
1356 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1357 ServiceDiscoveryManager.log(
1358 Level.FINE,
1359 "newOldService called, ServiceItem: {0}",
1360 new Object[]{item}
1361 );
1362 }
1363 try {
1364 boolean previouslyDiscovered = false;
1365 ServiceItemReg itemReg;
1366 itemReg = serviceIdMap.get(id);
1367 if (itemReg == null) {
1368 if (!eventRegMap.containsKey(reg)) {
1369
1370 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1371 ServiceDiscoveryManager.log(
1372 Level.FINER,
1373 "eventRegMap doesn't contain ProxyReg, returning, ServiceItem: {0}",
1374 new Object[]{item}
1375 );
1376 return;
1377 }
1378
1379 itemReg = new ServiceItemReg(reg.getProxy(), item);
1380 ServiceItemReg existed = serviceIdMap.putIfAbsent(id, itemReg);
1381 if (existed != null) {
1382 itemReg = existed;
1383 if (itemReg.isDiscarded()) {
1384 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1385 ServiceDiscoveryManager.log(
1386 Level.FINER,
1387 "newOldService, discarded returning, ServiceItem: {0}",
1388 new Object[]{item}
1389 );
1390 }
1391 return;
1392 }
1393 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1394 ServiceDiscoveryManager.log(
1395 Level.FINER,
1396 "newOldService, previously discovered, ServiceItem: {0}",
1397 new Object[]{item}
1398 );
1399 }
1400 previouslyDiscovered = true;
1401 }
1402 } else if (itemReg.isDiscarded()) {
1403 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1404 ServiceDiscoveryManager.log(
1405 Level.FINER,
1406 "newOldService, discarded returning, ServiceItem: {0}",
1407 new Object[]{item}
1408 );
1409 }
1410 return;
1411 } else {
1412 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1413 ServiceDiscoveryManager.log(
1414 Level.FINER,
1415 "newOldService, previously discovered, ServiceItem: {0}",
1416 new Object[]{item}
1417 );
1418 }
1419 previouslyDiscovered = true;
1420 }
1421 if (previouslyDiscovered) {
1422
1423 itemMatchMatchChange(id, itemReg, reg.getProxy(), item, matchMatchEvent);
1424 } else {
1425
1426 ServiceItem newFilteredItem
1427 = filterMaybeDiscard(id, itemReg, item, false);
1428
1429 if (newFilteredItem != null) {
1430 addServiceNotify(newFilteredItem);
1431 }
1432 }
1433 } catch (RuntimeException e) {
1434 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
1435 ServiceDiscoveryManager.log(Level.FINE, "Runtime exception thrown in newOldService call", e);
1436 } finally {
1437 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1438 ServiceDiscoveryManager.log(Level.FINER,
1439 "newOldService call complete, ServiceItem: {0}",
1440 new Object[]{item});
1441 }
1442 }
1443
1444
1445
1446
1447 private ServiceItem findItem(ServiceID sid, ServiceItem[] items) {
1448 if (items != null) {
1449 for (int i = 0, length = items.length; i < length; i++) {
1450 if (sid.equals(items[i].serviceID)) {
1451 return items[i];
1452 }
1453 }
1454 }
1455 return null;
1456 }
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528 private void itemMatchMatchChange(ServiceID srvcID, ServiceItemReg itemReg, ServiceRegistrar proxy, ServiceItem newItem, boolean matchMatchEvent) {
1529
1530
1531
1532 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1533 ServiceDiscoveryManager.log(
1534 Level.FINE,
1535 "itemMatchMatchChange called, ServiceID: {0} ",
1536 new Object[]{srvcID}
1537 );
1538 }
1539 PreEventState pev = new PreEventState(proxy,itemReg,newItem,matchMatchEvent);
1540 ServiceItem newFilteredItem;
1541 serviceIdMap.computeIfPresent(srvcID, pev);
1542 if (pev.needToFilter){
1543
1544 newFilteredItem = filterMaybeDiscard(srvcID, itemReg, newItem, pev.notDiscarded);
1545 if (newFilteredItem != null) {
1546
1547 if (pev.attrsChanged && pev.oldFilteredItem != null) {
1548 changeServiceNotify(newFilteredItem, pev.oldFilteredItem);
1549 }
1550 if (pev.versionChanged) {
1551 if (pev.notDiscarded && pev.oldFilteredItem != null) {
1552 removeServiceNotify(pev.oldFilteredItem);
1553 }
1554 addServiceNotify(newFilteredItem);
1555 }
1556 }
1557 }
1558 }
1559
1560 private static class PreEventState implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg> {
1561
1562 private final ServiceRegistrar proxy;
1563 private final ServiceItemReg reg;
1564 private final ServiceItem newItem;
1565 private final boolean matchMatchEvent;
1566 ServiceItem oldItem;
1567 ServiceItem oldFilteredItem;
1568 boolean notDiscarded;
1569 boolean attrsChanged = false;
1570 boolean versionChanged = false;
1571 boolean needToFilter = false;
1572 ServiceRegistrar proxyChanged = null;
1573
1574 PreEventState(ServiceRegistrar proxy, ServiceItemReg reg, ServiceItem newItem, boolean matchMatchEvent){
1575 this.proxy = proxy;
1576 this.reg = reg;
1577 this.newItem = newItem;
1578 this.matchMatchEvent = matchMatchEvent;
1579 }
1580
1581 @Override
1582 public ServiceItemReg apply(ServiceID t, ServiceItemReg itemReg) {
1583 boolean loggable = ServiceDiscoveryManager.logger.isLoggable(Level.FINER);
1584 if (! reg.equals(itemReg)) {
1585 if (loggable)
1586 ServiceDiscoveryManager.log(
1587 Level.FINER,
1588 "PreEventState.apply, ServiceItemReg's not equal, returning. ServiceID: {0}",
1589 new Object[]{t}
1590 );
1591 return itemReg;
1592 }
1593 notDiscarded = !itemReg.isDiscarded();
1594 oldItem = itemReg.getItem();
1595 oldFilteredItem = itemReg.getFilteredItem();
1596 if (itemReg.proxyNotUsedToTrackChange(proxy, newItem)) {
1597
1598 if (loggable)
1599 ServiceDiscoveryManager.log(
1600 Level.FINER,
1601 "PreEventState.apply, proxyNotUsedToTrackChange. ServiceID: {0}",
1602 new Object[]{t}
1603 );
1604 if (matchMatchEvent) {
1605 if (loggable)
1606 ServiceDiscoveryManager.log(
1607 Level.FINER,
1608 "PreEventState.apply, matchMatchEvent true returning. ServiceID: {0}",
1609 new Object[]{t}
1610 );
1611 return itemReg;
1612 }
1613 if (notDiscarded) {
1614 if (loggable)
1615 ServiceDiscoveryManager.log(
1616 Level.FINER,
1617 "PreEventState.apply, notifyServiceRemoved true returning. ServiceID: {0}",
1618 new Object[]{t}
1619 );
1620 return itemReg;
1621 }
1622 if (loggable)
1623 ServiceDiscoveryManager.log(
1624 Level.FINER,
1625 "PreEventState.apply, proxyChanged = proxy. ServiceID: {0}",
1626 new Object[]{t}
1627 );
1628 proxyChanged = proxy;
1629 }
1630 if (!notDiscarded) {
1631 if (loggable)
1632 ServiceDiscoveryManager.log(
1633 Level.FINER,
1634 "PreEventState.apply, !notifyServiceRemoved, replacProxyUsedToTrackChange ServiceID: {0}",
1635 new Object[]{t}
1636 );
1637 itemReg.replaceProxyUsedToTrackChange(proxyChanged, newItem);
1638 itemReg.setFilteredItem(null);
1639 itemReg.discard();
1640 if (matchMatchEvent) {
1641 return itemReg;
1642 }
1643 }
1644
1645
1646
1647 if (matchMatchEvent || sameVersion(newItem, oldItem)) {
1648 if (!notDiscarded) {
1649 if (loggable)
1650 ServiceDiscoveryManager.log(
1651 Level.FINER,
1652 "PreEventState.apply, matchMatchEvent || sameVersion && !notifyServiceRemoved return itemReg, no need to filter ServiceID: {0}",
1653 new Object[]{t}
1654 );
1655 return itemReg;
1656 }
1657
1658
1659
1660
1661
1662
1663
1664 attrsChanged = !LookupAttributes.equal(newItem.attributeSets, oldItem.attributeSets);
1665 if (!attrsChanged) {
1666 if (loggable)
1667 ServiceDiscoveryManager.log(
1668 Level.FINER,
1669 "PreEventState.apply, matchMatchEvent || sameVersion && !attrsChanged return itemReg, no need to filter ServiceID: {0}",
1670 new Object[]{t}
1671 );
1672 return itemReg;
1673 }
1674 } else {
1675
1676 if (loggable)
1677 ServiceDiscoveryManager.log(
1678 Level.FINER,
1679 "PreEventState.apply, !matchMatchEvent &&! sameVersion ==> re-registrattion, versionChanged. ServiceID: {0}",
1680 new Object[]{t}
1681 );
1682 versionChanged = true;
1683 }
1684 if (loggable)
1685 ServiceDiscoveryManager.log(
1686 Level.FINER,
1687 "PreEventState.apply, need to filter true. ServiceID: {0}",
1688 new Object[]{t}
1689 );
1690 needToFilter = true;
1691 return itemReg;
1692 }
1693
1694 }
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706 private static boolean sameVersion(ServiceItem item0, ServiceItem item1) {
1707 boolean fullyEqual = false;
1708 try {
1709 Object service0 = item0.service;
1710 Object service1 = item1.service;
1711 if (service0 instanceof RemoteMethodControl
1712 && service1 instanceof RemoteMethodControl)
1713 {
1714 MethodConstraints constraints = ((RemoteMethodControl)service0).getConstraints();
1715 service1 = ((RemoteMethodControl) service1).setConstraints(constraints);
1716 }
1717 MarshalledInstance mi0 = new MarshalledInstance(service0);
1718 MarshalledInstance mi1 = new MarshalledInstance(service1);
1719 fullyEqual = mi0.fullyEquals(mi1);
1720 } catch (IOException e) {
1721 if (ServiceDiscoveryManager.logger.isLoggable(Level.INFO)){
1722 ServiceDiscoveryManager.log(
1723 Level.INFO,
1724 "failure marshalling old and new services for equality check",
1725 e
1726 );
1727 }
1728 }
1729 return fullyEqual;
1730 }
1731
1732
1733
1734
1735 public long getLeaseDuration() {
1736 if (leaseDuration == Long.MAX_VALUE) {
1737 return Long.MAX_VALUE;
1738 }
1739 return leaseDuration + startTime - System.currentTimeMillis();
1740 }
1741
1742
1743
1744
1745 private void addServiceNotify(ServiceItem item) {
1746 serviceNotifyDo(null, item, ITEM_ADDED);
1747 }
1748
1749
1750
1751
1752
1753 private void addServiceNotify(ServiceItem item, ServiceDiscoveryListener srvcListener) {
1754 eventNotificationExecutor.execute(new ServiceNotifyDo(null, item, ITEM_ADDED, srvcListener, this));
1755 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)) {
1756 try {
1757 throw new Exception("Back Trace");
1758 } catch (Exception ex) {
1759 ex.fillInStackTrace();
1760 ServiceDiscoveryManager.log(
1761 Level.FINEST,
1762 "Log back trace",
1763 ex
1764 );
1765 }
1766 }
1767 }
1768
1769
1770
1771
1772 private void removeServiceNotify(ServiceItem item) {
1773 serviceNotifyDo(item, null, ITEM_REMOVED);
1774 }
1775
1776
1777
1778
1779
1780 private void changeServiceNotify(ServiceItem newItem, ServiceItem oldItem) {
1781 serviceNotifyDo(oldItem, newItem, ITEM_CHANGED);
1782 }
1783
1784
1785
1786
1787 private void serviceNotifyDo(ServiceItem oldItem, ServiceItem item, int action) {
1788 sItemListenersRead.lock();
1789 try {
1790 if (sItemListeners.isEmpty()) {
1791 return;
1792 }
1793 Iterator<ServiceDiscoveryListener> iter = sItemListeners.iterator();
1794 while (iter.hasNext()) {
1795 ServiceDiscoveryListener sl = iter.next();
1796 eventNotificationExecutor.execute(new ServiceNotifyDo(oldItem, item, action, sl, this));
1797 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)) {
1798 try {
1799 throw new Exception("Back Trace");
1800 } catch (Exception ex) {
1801 ex.fillInStackTrace();
1802 ServiceDiscoveryManager.log(
1803 Level.FINEST,
1804 "Log back trace",
1805 ex
1806 );
1807 }
1808 }
1809 }
1810 } finally {
1811 sItemListenersRead.unlock();
1812 }
1813 }
1814
1815
1816
1817
1818
1819 private static class ServiceNotifyDo implements Runnable {
1820
1821 final ServiceItem oldItem;
1822 final ServiceItem item;
1823 final int action;
1824 final ServiceDiscoveryListener sl;
1825 final Object lookupCache;
1826
1827 ServiceNotifyDo(ServiceItem oldItem, ServiceItem item, int action, ServiceDiscoveryListener sl, LookupCache lookupCache) {
1828 this.oldItem = oldItem;
1829 this.item = item;
1830 this.action = action;
1831 this.sl = sl;
1832 this.lookupCache = lookupCache;
1833 }
1834
1835 @Override
1836 public void run() {
1837 ServiceDiscoveryEvent event;
1838 try {
1839 event = new ServiceDiscoveryEvent(lookupCache, oldItem, item);
1840 } catch (NullPointerException e) {
1841 boolean lookupCacheNull = lookupCache == null;
1842 boolean oldItemNull = oldItem == null;
1843 boolean itemNull = item == null;
1844 if (ServiceDiscoveryManager.logger.isLoggable(Level.INFO))
1845 ServiceDiscoveryManager.log(
1846 Level.INFO,
1847 "ServiceDiscoveryEvent constructor threw NullPointerException, lookupCache null? {0} oldItem null? {1} item null? {2}",
1848 new Object[]{lookupCacheNull, oldItemNull, itemNull}
1849 );
1850 return;
1851 }
1852 switch (action) {
1853 case ITEM_ADDED:
1854 sl.serviceAdded(event);
1855 break;
1856 case ITEM_REMOVED:
1857 sl.serviceRemoved(event);
1858 break;
1859 case ITEM_CHANGED:
1860 sl.serviceChanged(event);
1861 break;
1862 default:
1863 throw new IllegalArgumentException("case must be one of the following: ITEM_ADDED, ITEM_REMOVED or ITEM_CHANGED");
1864 }
1865 }
1866 }
1867
1868 void initCache() throws RemoteException {
1869
1870
1871
1872 try {
1873 Exporter defaultExporter =
1874 new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
1875 new AtomicILFactory(null, null, LookupCacheImpl.class.getClassLoader()),
1876 false,
1877 false
1878 );
1879 lookupListenerExporter =
1880 sdm.thisConfig.getEntry(
1881 ServiceDiscoveryManager.COMPONENT_NAME,
1882 "eventListenerExporter",
1883 Exporter.class,
1884 defaultExporter
1885 );
1886 } catch (ConfigurationException e) {
1887
1888 ExportException e1 = new ExportException("Configuration exception while " + "retrieving exporter for " + "cache's remote event listener", e);
1889 throw e1;
1890 }
1891
1892
1893
1894 try {
1895 eventNotificationExecutor =
1896 sdm.thisConfig.getEntry(
1897 ServiceDiscoveryManager.COMPONENT_NAME,
1898 "eventNotificationExecutor",
1899 ExecutorService.class
1900 );
1901 } catch (ConfigurationException e) {
1902
1903 eventNotificationExecutor =
1904 new ThreadPoolExecutor(1, 1, 15L, TimeUnit.SECONDS,
1905 new LinkedBlockingQueue<Runnable>(),
1906 new NamedThreadFactory("SDM event notifier: " + toString(), false),
1907 new ThreadPoolExecutor.CallerRunsPolicy());
1908 }
1909
1910
1911
1912
1913 try {
1914 cacheTaskMgr = sdm.thisConfig.getEntry(
1915 ServiceDiscoveryManager.COMPONENT_NAME,
1916 "cacheExecutorService",
1917 ExecutorService.class
1918 );
1919 } catch (ConfigurationException e) {
1920
1921 cacheTaskMgr = new ThreadPoolExecutor(3, 3, 15L, TimeUnit.SECONDS,
1922 new LinkedBlockingQueue<Runnable>(),
1923 new NamedThreadFactory("SDM lookup cache: " + toString(), false),
1924 new ThreadPoolExecutor.CallerRunsPolicy()
1925 );
1926 }
1927 cacheTaskMgr = new ExtensibleExecutorService(cacheTaskMgr, new ExtensibleExecutorService.RunnableFutureFactory() {
1928 @Override
1929 public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
1930 if (r instanceof ObservableFutureTask) {
1931 return (RunnableFuture<T>) r;
1932 }
1933 return new CacheTaskWrapper<T>(r, value);
1934 }
1935
1936 @Override
1937 public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
1938 if (c instanceof ObservableFutureTask) {
1939 return (RunnableFuture<T>) c;
1940 }
1941 return new CacheTaskWrapper<T>(c);
1942 }
1943 });
1944 cacheTaskDepMgr = new CacheTaskDependencyManager(cacheTaskMgr);
1945
1946
1947
1948
1949
1950
1951 try {
1952 serviceDiscardTimerTaskMgr =
1953 sdm.thisConfig.getEntry(
1954 ServiceDiscoveryManager.COMPONENT_NAME,
1955 "discardExecutorService",
1956 ScheduledExecutorService.class
1957 );
1958 } catch (ConfigurationException e) {
1959
1960 serviceDiscardTimerTaskMgr =
1961 new ScheduledThreadPoolExecutor(
1962 4,
1963 new NamedThreadFactory("SDM discard timer: " + toString(), false)
1964 );
1965 }
1966
1967
1968
1969 try {
1970 incomingEventExecutor = sdm.thisConfig.getEntry(
1971 ServiceDiscoveryManager.COMPONENT_NAME,
1972 "ServiceEventExecutorService",
1973 ExecutorService.class
1974 );
1975 } catch (ConfigurationException e){
1976 incomingEventExecutor =
1977 new ThreadPoolExecutor(1, 1, 15L, TimeUnit.SECONDS,
1978 new PriorityBlockingQueue(256),
1979 new NamedThreadFactory("SDM ServiceEvent: " + toString(), false),
1980 new ThreadPoolExecutor.DiscardOldestPolicy()
1981 );
1982 }
1983 incomingEventExecutor = new ExtensibleExecutorService(incomingEventExecutor,
1984 new ExtensibleExecutorService.RunnableFutureFactory()
1985 {
1986
1987 @Override
1988 public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
1989 return new ComparableFutureTask<T>(r,value);
1990 }
1991
1992 @Override
1993 public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
1994 return new ComparableFutureTask<T>(c);
1995 }
1996
1997 }
1998 );
1999
2000 lookupListenerProxy = lookupListener.export();
2001 sdm.proxyRegSetRead.lock();
2002 try {
2003 Iterator<ProxyReg> it = sdm.proxyRegSet.iterator();
2004 while (it.hasNext()) {
2005 addProxyReg(it.next());
2006 }
2007 } finally {
2008 sdm.proxyRegSetRead.unlock();
2009 }
2010
2011 }
2012
2013
2014
2015
2016
2017
2018
2019 private static class ComparableFutureTask<V> extends FutureTask<V>
2020 implements Comparable<ComparableFutureTask>
2021 {
2022
2023 private final Object task;
2024
2025 public ComparableFutureTask(Runnable runnable, V result) {
2026 super(runnable, result);
2027 task = runnable;
2028 }
2029
2030 public ComparableFutureTask(Callable<V> callable){
2031 super(callable);
2032 task = callable;
2033 }
2034
2035 @Override
2036 public int compareTo(ComparableFutureTask o) {
2037 if (task instanceof Comparable && o.task instanceof Comparable){
2038 return ((Comparable)task).compareTo(o.task);
2039 }
2040 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2041 ServiceDiscoveryManager.log(
2042 Level.FINEST,
2043 "task not instanceof Comparable {0}",
2044 new Object [] {task.getClass().getCanonicalName()}
2045 );
2046 return 0;
2047 }
2048
2049 @Override
2050 public int hashCode() {
2051 int hash = 3;
2052 hash = 89 * hash + (this.task != null ? this.task.hashCode() : 0);
2053 return hash;
2054 }
2055
2056 @Override
2057 public boolean equals(Object o){
2058 if (!(o instanceof ComparableFutureTask)) return false;
2059 return this.task.equals(((ComparableFutureTask)o).task);
2060 }
2061
2062 }
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090 private ServiceItem filterMaybeDiscard(ServiceID srvcID, ServiceItemReg itemReg, ServiceItem item, boolean sendEvent) {
2091 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2092 ServiceDiscoveryManager.log(
2093 Level.FINE,
2094 "filterMaybeDiscard called, ServiceID: {0}",
2095 new Object [] {srvcID}
2096 );
2097 if ((item == null) || (item.service == null)) {
2098 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2099 ServiceDiscoveryManager.log(
2100 Level.FINER,
2101 "filterMaybeDiscard, item or service was null, returning null, ServiceID: {0}",
2102 new Object []{srvcID}
2103 );
2104 return null;
2105 }
2106 boolean addFilteredItemToMap = false;
2107
2108 ServiceItem filteredItem = item.clone();
2109 boolean discardRetryLater = false;
2110 boolean pass = false;
2111 if (filter == null) {
2112 pass = true;
2113 if (useInsecureLookup){
2114 addFilteredItemToMap = true;
2115 } else {
2116 try {
2117 filteredItem.service =
2118 ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
2119 addFilteredItemToMap = true;
2120 } catch (RemoteException ex) {
2121 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2122 ServiceDiscoveryManager.log(Level.FINE,
2123 "Exception thrown while trying to download service proxy",
2124 ex
2125 );
2126 discardRetryLater = true;
2127 }
2128 }
2129 } else {
2130 if (useInsecureLookup){
2131 pass = filter.check(filteredItem);
2132 } else {
2133 try {
2134 pass = filter.check(filteredItem);
2135 } catch (SecurityException ex){
2136 try {
2137
2138 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
2139 pass = filter.check(filteredItem);
2140 } catch (RemoteException ex1) {
2141 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2142 ServiceDiscoveryManager.log(Level.FINE,
2143 "Exception thrown while trying to download service proxy",
2144 ex1
2145 );
2146 discardRetryLater = true;
2147 }
2148 } catch (ClassCastException ex){
2149 try {
2150
2151 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
2152 pass = filter.check(filteredItem);
2153 } catch (RemoteException ex1) {
2154 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2155 ServiceDiscoveryManager.log(Level.FINE,
2156 "Exception thrown while trying to download service proxy",
2157 ex1
2158 );
2159 discardRetryLater = true;
2160 }
2161 }
2162 }
2163
2164 if (pass && !discardRetryLater && filteredItem.service != null) {
2165 addFilteredItemToMap = true;
2166 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2167 ServiceDiscoveryManager.log(Level.FINER,
2168 "filterMaybeDiscard, filter passed, ServiceID: {0}",
2169 new Object[]{srvcID}
2170 );
2171 }
2172 }
2173 PostEventState pes =
2174 new PostEventState(this, itemReg, item, filteredItem, sendEvent,
2175 pass, discardRetryLater, addFilteredItemToMap, sdm.getDiscardWait());
2176 serviceIdMap.computeIfPresent(srvcID, pes);
2177 if (pes.notifyRemoved && pes.oldFilteredItem != null) {
2178 removeServiceNotify(pes.oldFilteredItem);
2179 }
2180 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2181 ServiceDiscoveryManager.log(Level.FINER,
2182 "filterMaybeDiscard, returning filtered ServiceItem: {0}",
2183 new Object []{pes.filteredItemPass}
2184 );
2185 return pes.filteredItemPass;
2186 }
2187
2188 private static class PostEventState implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg> {
2189
2190 ServiceItem filteredItemPass;
2191 ServiceItem oldFilteredItem = null;
2192 final ServiceItem item;
2193 final ServiceItem filteredItem;
2194 final boolean pass;
2195 final boolean discardRetryLater;
2196 final boolean sendEvent;
2197 final boolean addFilteredItemToMap;
2198 boolean notifyRemoved = false;
2199 final ServiceItemReg expected;
2200 final LookupCacheImpl cache;
2201 final long discardWait;
2202
2203 PostEventState(LookupCacheImpl cache, ServiceItemReg itemReg,
2204 ServiceItem item, ServiceItem filteredItem, boolean sendEvent,
2205 boolean pass, boolean discardRetryLater,
2206 boolean addFilteredItemToMap, long discardWait)
2207 {
2208 this.cache = cache;
2209 this.expected = itemReg;
2210 this.item = item;
2211 this.filteredItem = filteredItem;
2212 this.sendEvent = sendEvent;
2213 this.pass = pass;
2214 this.discardRetryLater = discardRetryLater;
2215 this.addFilteredItemToMap = addFilteredItemToMap;
2216 this.discardWait = discardWait;
2217 }
2218
2219 @Override
2220 public ServiceItemReg apply(ServiceID id, ServiceItemReg itemReg) {
2221 if (!expected.equals(itemReg)) return itemReg;
2222 ServiceItemReg removeIfNull = itemReg;
2223
2224 if (!pass && !discardRetryLater) {
2225 if (itemReg != null) {
2226 if (sendEvent) {
2227 oldFilteredItem = itemReg.getFilteredItem();
2228 notifyRemoved = true;
2229 removeIfNull = null;
2230 } else {
2231 removeIfNull = null;
2232 }
2233 }
2234 filteredItemPass = null;
2235 }
2236 if (addFilteredItemToMap){
2237
2238
2239
2240
2241
2242
2243 cache.cancelDiscardTask(id);
2244 itemReg.replaceProxyUsedToTrackChange(null, item);
2245 itemReg.setFilteredItem(filteredItem);
2246 filteredItemPass = filteredItem;
2247 }
2248
2249 if (discardRetryLater){
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260 if (itemReg.discard()) {
2261 itemReg.replaceProxyUsedToTrackChange(null, item);
2262 itemReg.setFilteredItem(null);
2263 Future f = cache.serviceDiscardTimerTaskMgr.schedule(
2264 new ServiceDiscardTimerTask(cache, item.serviceID),
2265 discardWait,
2266 TimeUnit.MILLISECONDS
2267 );
2268 cache.serviceDiscardFutures.put(item.serviceID, f);
2269 if (sendEvent) {
2270 notifyRemoved = true;
2271 }
2272 }
2273 }
2274 return removeIfNull;
2275 }
2276
2277 }
2278
2279
2280
2281
2282
2283
2284
2285
2286 private void handleMatchNoMatch(ServiceRegistrar proxy, ServiceID srvcID) {
2287 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2288 ServiceDiscoveryManager.log(Level.FINER,
2289 "handleMatchNoMatch called, ServiceID: {0}",
2290 new Object []{srvcID}
2291 );
2292 DissociateLusCleanUpOrphan dlcl = new DissociateLusCleanUpOrphan(this, proxy);
2293 serviceIdMap.computeIfPresent(srvcID, dlcl);
2294 if (dlcl.itemRegProxy != null) {
2295 itemMatchMatchChange(srvcID, dlcl.itmReg, dlcl.itemRegProxy, dlcl.newItem, false);
2296 } else if (dlcl.notify && dlcl.filteredItem != null) {
2297 removeServiceNotify(dlcl.filteredItem);
2298 }
2299 }
2300
2301
2302
2303
2304 private static class DissociateLusCleanUpOrphan
2305 implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg>
2306 {
2307
2308 final LookupCacheImpl cache;
2309 boolean notify;
2310 final ServiceRegistrar proxy;
2311 ServiceRegistrar itemRegProxy;
2312 ServiceItemReg itmReg;
2313 ServiceItem newItem;
2314 ServiceItem filteredItem;
2315
2316 DissociateLusCleanUpOrphan(LookupCacheImpl cache, ServiceRegistrar proxy){
2317 this.itmReg = null;
2318 this.notify = false;
2319 this.itemRegProxy = null;
2320 this.cache = cache;
2321 this.proxy = proxy;
2322 this.newItem = null;
2323 this.filteredItem = null;
2324 }
2325
2326 @Override
2327 public ServiceItemReg apply(ServiceID srvcID, ServiceItemReg itemReg) {
2328 itmReg = itemReg;
2329 newItem = itemReg.removeProxy(proxy);
2330 filteredItem = itemReg.getFilteredItem();
2331 if (newItem != null) {
2332 itemRegProxy = itemReg.getProxy();
2333 }
2334
2335
2336
2337 else if (itemReg.hasNoProxys()) {
2338 if (itemReg.isDiscarded()) {
2339
2340 itmReg = null;
2341 cache.cancelDiscardTask(srvcID);
2342 } else {
2343
2344 notify = true;
2345 itmReg = null;
2346 }
2347 }
2348 return itmReg;
2349 }
2350 }
2351
2352
2353
2354
2355 private void cancelDiscardTask(ServiceID sid) {
2356
2357 Future task = serviceDiscardFutures.get(sid);
2358 if (task != null) {
2359 task.cancel(true);
2360 }
2361 }
2362
2363
2364
2365
2366 final static class CacheTaskDependencyManager implements FutureObserver {
2367
2368
2369
2370 private final ConcurrentLinkedQueue<CacheTaskWrapper> pending;
2371 private final ExecutorService executor;
2372
2373 CacheTaskDependencyManager(ExecutorService e) {
2374 this.pending = new ConcurrentLinkedQueue<CacheTaskWrapper>();
2375 executor = e;
2376 }
2377
2378 CacheTaskWrapper submit(Runnable t) {
2379 CacheTaskWrapper future = new CacheTaskWrapper(t, null);
2380 pending.offer(future);
2381 future.addObserver(this);
2382 if (t instanceof CacheTask && ((CacheTask) t).hasDeps()) {
2383 List<FutureObserver.ObservableFuture> deps = new LinkedList<FutureObserver.ObservableFuture>();
2384 Iterator<CacheTaskWrapper> it = pending.iterator();
2385 while (it.hasNext()) {
2386 CacheTaskWrapper w = it.next();
2387 Object c = w.getTask();
2388 if (c instanceof CacheTask && ((CacheTask) t).dependsOn((CacheTask) c)) {
2389 deps.add(w);
2390 }
2391 }
2392 if (deps.isEmpty()) {
2393 executor.submit(future);
2394 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2395 ServiceDiscoveryManager.log(
2396 Level.FINEST,
2397 "ServiceDiscoveryManager {0} submitted to executor task queue",
2398 new Object []{t.toString()}
2399 );
2400 } else {
2401 DependencyLinker linker = new DependencyLinker(executor, deps, future);
2402 linker.register();
2403 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2404 ServiceDiscoveryManager.log(
2405 Level.FINEST,
2406 "ServiceDiscoveryManager {0} registered dependencies",
2407 new Object [] {t.toString()}
2408 );
2409 }
2410 } else {
2411 executor.submit(future);
2412 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2413 ServiceDiscoveryManager.log(Level.FINEST,
2414 "ServiceDiscoveryManager {0} submitted to executor task queue",
2415 new Object []{t.toString()}
2416 );
2417 }
2418 return future;
2419 }
2420
2421 @Override
2422 public void futureCompleted(Future e) {
2423 pending.remove(e);
2424 Object t;
2425 if (e instanceof CacheTaskWrapper) {
2426 t = ((CacheTaskWrapper) e).getTask();
2427 } else {
2428 t = e;
2429 }
2430 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2431 ServiceDiscoveryManager.log(
2432 Level.FINEST,
2433 "ServiceDiscoveryManager {0} completed execution",
2434 new Object[]{t.toString()}
2435 );
2436 }
2437
2438
2439
2440
2441
2442
2443 void removeUselessTask(ProxyReg reg) {
2444 Iterator<CacheTaskWrapper> it = pending.iterator();
2445 while (it.hasNext()) {
2446 CacheTaskWrapper w = it.next();
2447 Object t = w.getTask();
2448 if (t instanceof CacheTask && ((CacheTask) t).isFromProxy(reg)) {
2449 w.cancel(true);
2450 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2451 ServiceDiscoveryManager.log(
2452 Level.FINEST,
2453 "ServiceDiscoveryManager {0} cancelled",
2454 new Object[]{t.toString()}
2455 );
2456 }
2457 }
2458 }
2459
2460 }
2461
2462
2463
2464
2465
2466
2467 final static class CacheTaskWrapper<T> extends ObservableFutureTask<T> {
2468
2469 private final Object task;
2470
2471 CacheTaskWrapper(Runnable r, T result) {
2472 super(r, result);
2473 task = r;
2474 }
2475
2476 CacheTaskWrapper(Callable<T> c) {
2477 super(c);
2478 task = c;
2479 }
2480
2481 Object getTask() {
2482 return task;
2483 }
2484
2485 }
2486
2487
2488
2489
2490 abstract static class CacheTask implements Runnable {
2491
2492 protected final ProxyReg reg;
2493 protected volatile long thisTaskSeqN;
2494
2495 protected CacheTask(ProxyReg reg, long seqN) {
2496 this.reg = reg;
2497 this.thisTaskSeqN = seqN;
2498 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2499 ServiceDiscoveryManager.log(
2500 Level.FINEST,
2501 "ServiceDiscoveryManager {0} constructed",
2502 new Object[]{toString()}
2503 );
2504 }
2505
2506
2507 public boolean isFromProxy(ProxyReg reg) {
2508 if (this.reg == null) {
2509 return false;
2510 }
2511 return (this.reg).equals(reg);
2512 }
2513
2514
2515
2516
2517 public ProxyReg getProxyReg() {
2518 return reg;
2519 }
2520
2521
2522
2523
2524 public long getSeqN() {
2525 return thisTaskSeqN;
2526 }
2527
2528 public abstract boolean hasDeps();
2529
2530 public boolean dependsOn(CacheTask task) {
2531 return false;
2532 }
2533 }
2534 }