Customizing your workflow
from BlockchainSpider.items import SyncItem, TransactionItem, TraceItem, \
Token721TransferItem, Token20TransferItem, Token1155TransferItem
class MoTSPipeline:
def __init__(self):
...
def process_item(self, item, spider):
if self.file is None:
return item
if not isinstance(item, SyncItem):
return item
# collect money transfer items
# the 'data' field in SyncItem is a dict,
# where keys are parsed item class names,
# and values are parsed items.
# all the items in a SyncItem is parsed from the same block
txhash2edges = dict()
transfer_type_names = [
cls.__name__ for cls in [
TransactionItem, TraceItem,
Token721TransferItem, Token20TransferItem,
Token1155TransferItem,
]
]
for name in transfer_type_names:
if not item['data'].get(name):
continue
for transfer_item in item['data'][name]:
txhash = transfer_item['transaction_hash']
if not txhash2edges.get(txhash):
txhash2edges[txhash] = list()
txhash2edges[txhash].append({
'address_from': transfer_item['address_from'],
'address_to': transfer_item['address_to'],
})
# create calc vec task
vecs = list()
for txhash, edges in txhash2edges.items():
vec = HighOrderMotifCounter(motif_size=4).count(edges)
vecs.append(vec)
# start the tasks
for txhash, vec in zip(txhashes, vecs):
vec_list = [vec[i] for i in range(1, 16 + 1)]
self.writer.writerow([txhash, *vec_list])
return itemLast updated