-
Notifications
You must be signed in to change notification settings - Fork 8
/
receive_file.py
149 lines (123 loc) · 3.73 KB
/
receive_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
"""
receive file [email protected]
payload is json:
"timeid": message ID
"filename": file name
"filesize": "filename" size
"filehash": "filename" hash (md5)
"chunkdata": chunk of the "filename"
"chunkhash": hash of the "chunkdata" (md5)
"chunknumber": number of "chunkdata", numbered from (0 - null,zero)
"encode": "chunkdata" encoding type (base64)
"end": end of message (True - end)
Usage: receive_file.py dir
"""
import os
import sys
import time
import glob
import _thread
import json
import binascii
import base64
import hashlib
import paho.mqtt.client as mqtt
HOST = "192.168.0.10"
PORT = 1883
SUBTOPIC = "/file"
PUBTOPIC = SUBTOPIC+"/status"
TEMPDIR = "temp"
client = mqtt.Client() # mqtt client
def my_json(msg):
return json.dumps(msg) # object2string
def my_exit(err): # exit programm
os._exit(err)
os.kill(os.getpid)
def my_md5(fname): # calculate md5sum
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def my_temp_file(mydata, myhash, mynumber, timeid, filename):
""" save data to temp file
and send recieved chunknumber
"""
if hashlib.md5(mydata.encode()).hexdigest() == myhash:
fname = TEMPDIR+"/"+str(timeid)+"_"+filename+"_.temp"
f = open(fname, "ab")
if mynumber == 0:
f = open(fname, "wb")
try:
f.write(base64.b64decode(mydata))
except Exception as e:
print("ERR: write file", fname, e)
return 1
finally:
f.close()
print("saved chunk", mynumber, "to", fname)
client.publish(PUBTOPIC, my_json({"chunknumber": mynumber}))
def my_check_temp_files(filename, timeid, filehash):
""" check temp file and rename to original
"""
os.sync()
for l in os.listdir(TEMPDIR):
nameid = l.split("_")[0]
if nameid == timeid:
if my_md5(TEMPDIR+"/"+l) == filehash:
os.rename(TEMPDIR+"/"+l, TEMPDIR+"/"+filename)
for f in glob.glob(TEMPDIR+"/*.temp"):
os.remove(f)
print("OK: saved file", filename)
def my_event(top, msg, qos, retain):
""" convert msg to json,
send data to file
"""
try:
if type(msg) is bytes:
msg = msg.decode()
j = json.loads(msg)
except Exception as e:
print("ERR: msg2json", e)
my_exit(2)
try:
if j["end"] is False:
my_temp_file(
j["chunkdata"],
j["chunkhash"],
j["chunknumber"],
j["timeid"],
j["filename"])
if j["end"] is True:
my_check_temp_files(j["filename"], j["timeid"], j["filehash"])
my_exit(0)
except Exception as e:
print("ERR: parse json", e)
my_exit(3)
def on_connect(client, userdata, flags, rc):
print("OK Connected with result code "+str(rc))
client.subscribe(SUBTOPIC, qos=0)
print("Subscribe: " + SUBTOPIC)
def on_message(client, userdata, msg):
_thread.start_new_thread(my_event, (
msg.topic,
msg.payload,
msg.qos,
msg.retain))
def main():
if not os.path.exists(TEMPDIR):
try:
os.makedirs(TEMPDIR)
except:
print("ERR create dir "+TEMPDIR)
return 1
client.connect(HOST, PORT, 60)
# client.connect("localhost", 1883, 60)
# client.connect("test.mosquitto.org")
# client.connect("broker.hivemq.com", 1883, 60)
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
if __name__ == "__main__":
main()