-
Notifications
You must be signed in to change notification settings - Fork 572
Expand file tree
/
Copy pathobjectracker.py
More file actions
130 lines (113 loc) · 4.36 KB
/
objectracker.py
File metadata and controls
130 lines (113 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
src/network/objectracker.py
===========================
"""
import time
from threading import RLock
import network.connectionpool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
haveBloom = False
try:
# pybloomfiltermmap
from pybloomfilter import BloomFilter
haveBloom = True
except ImportError:
try:
# pybloom
from pybloom import BloomFilter
haveBloom = True
except ImportError:
pass
# it isn't actually implemented yet so no point in turning it on
haveBloom = False
# tracking pending downloads globally, for stats
missingObjects = {}
class ObjectTracker(object):
"""Object tracker mixin"""
invCleanPeriod = 300
invInitialCapacity = 50000
invErrorRate = 0.03
trackingExpires = 3600
initialTimeOffset = 60
def __init__(self):
self.objectsNewToMe = RandomTrackingDict()
self.objectsNewToThem = {}
self.objectsNewToThemLock = RLock()
self.initInvBloom()
self.initAddrBloom()
self.lastCleaned = time.time()
def initInvBloom(self):
"""Init bloom filter for tracking. WIP."""
if haveBloom:
# lock?
self.invBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def initAddrBloom(self):
"""Init bloom filter for tracking addrs, WIP. This either needs to be moved to addrthread.py or removed."""
if haveBloom:
# lock?
self.addrBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def clean(self):
"""Clean up tracking to prevent memory bloat"""
if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
if haveBloom:
if missingObjects == 0:
self.initInvBloom()
self.initAddrBloom()
else:
# release memory
deadline = time.time() - ObjectTracker.trackingExpires
with self.objectsNewToThemLock:
self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline}
self.lastCleaned = time.time()
def hasObj(self, hashid):
"""Do we already have object?"""
if haveBloom:
return hashid in self.invBloom
return hashid in self.objectsNewToMe
def handleReceivedInventory(self, hashId):
"""Handling received inventory"""
if haveBloom:
self.invBloom.add(hashId)
try:
with self.objectsNewToThemLock:
del self.objectsNewToThem[hashId]
except KeyError:
pass
if hashId not in missingObjects:
missingObjects[hashId] = time.time()
self.objectsNewToMe[hashId] = True
def handleReceivedObject(self, streamNumber, hashid):
"""Handling received object"""
for i in network.connectionpool.BMConnectionPool().inboundConnections.values(
) + network.connectionpool.BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished:
continue
try:
del i.objectsNewToMe[hashid]
except KeyError:
if streamNumber in i.streams and (
not Dandelion().hasHash(hashid) or Dandelion().objectChildStem(hashid) == i):
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time.time()
# update stream number, which we didn't have when we just received the dinv
# also resets expiration of the stem mode
Dandelion().setHashStream(hashid, streamNumber)
if i == self:
try:
with i.objectsNewToThemLock:
del i.objectsNewToThem[hashid]
except KeyError:
pass
self.objectsNewToMe.setLastObject()
def hasAddr(self, addr):
"""WIP, should be moved to addrthread.py or removed"""
if haveBloom:
return addr in self.invBloom
return None
def addAddr(self, hashid):
"""WIP, should be moved to addrthread.py or removed"""
if haveBloom:
self.addrBloom.add(hashid)