RouterOS API
#!/usr/bin/env python3
import codecs
import hashlib
import threading
import socket
import shlex
class RosApi:
def __init__(self, socket):
self._socket = socket
def read_bytes(self, n):
b = b''
while n > 0:
tmp = self._socket.recv(n)
n -= len(tmp)
b += tmp
return b
def write_bytes(self, b):
self._socket.send(b)
def read_int(self):
byteorder = "big"
n, = self.read_bytes(1)
if not n & 0b10000000:
mask, more = 0b01111111, 0
elif not n & 0b01000000:
mask, more = 0b00111111, 1
elif not n & 0b00100000:
mask, more = 0b00111111, 2
elif not n & 0b00010000:
mask, more = 0b00011111, 3
else:
mask, more = 0b00001111, 4
return int.from_bytes(
(n & mask).to_bytes(1, byteorder) + self.read_bytes(more), byteorder)
def write_int(self, n):
byteorder = "big"
if n < 0x80:
b = n.to_bytes(1, byteorder)
elif n < 0x4000:
b = (n | 0x8000).to_bytes(2, byteorder)
elif n < 0x200000:
b = (n | 0xC00000).to_bytes(3, byteorder)
elif n < 0x10000000:
b = (n | 0xE0000000).to_bytes(4, byteorder)
else:
b = b'\xF0' + n.to_bytes(4, byteorder)
self.write_bytes(b)
def read_word(self):
return self.read_bytes(self.read_int()).decode()
def write_word(self, s):
bs = s.encode()
self.write_int(len(bs))
self.write_bytes(bs)
def talk(self, *words):
for w in words:
self.write_word(w)
self.write_bytes(b'\x00')
def listen(self):
words = []
while True:
w = self.read_word()
if not w:
break
words.append(w)
return words
def login(self, name="admin", password=""):
self.talk("/login")
done, msg = self.listen()
*_, challenge = msg.split("=")
assert done == "!done", done
md5 = hashlib.md5(b'\x00' + password.encode() + codecs.decode(challenge, "hex")).hexdigest()
self.talk("/login", "=name={}".format(name), "=response=00{}".format(md5))
reply = self.listen()
assert reply == ["!done"], reply
def test():
class Socket():
def __init__(self, data):
self._data = data
self._idx = 0
def send(self, b):
print(codecs.encode(b, "hex"))
def recv(self, n):
#n = 1
i = self._idx
assert i < len(self._data)
self._idx = i + n
return self._data[i:i + n]
api = RosApi(Socket(b'\x7F' b'\xBF\xFF' b'\xCC'))
api.write_int(1)
api.write_int(127), api.write_int(128)
api.write_int(0x3FFF), api.write_int(0x4000)
api.write_int(0x1FFFFF), api.write_int(0x200000)
api.write_int(0x0FFFFFFF), api.write_int(0x10000000)
api = RosApi(Socket(
b'\x7F' b'\x80\x80'
b'\xBF\xFF' b'\xC0\x40\x00'
b'\xDF\xFF\xFF' b'\xE0\x20\x00\x00'
b'\xEF\xFF\xFF\xFF' b'\xF0\x10\x00\x00\x00'
))
assert api.read_int() == 127
assert api.read_int() == 128
assert api.read_int() == 0x3FFF
assert api.read_int() == 0x4000
assert api.read_int() == 0x1FFFFF
assert api.read_int() == 0x200000
assert api.read_int() == 0x0FFFFFFF
assert api.read_int() == 0x10000000
def main():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.settimeout(1)
s.connect(("192.168.0.21", 8728))
api = RosApi(s)
api.login('q', '')
run = True
def show_reply():
while run:
try:
command, *attributes = api.listen()
except socket.timeout:
continue
print(command)
for i in attributes:
print(" " + i)
print()
threading.Thread(target=show_reply).start()
try:
words = []
while True:
tmp = input()
if tmp:
words.append(tmp)
else:
api.talk(*shlex.split(" ".join(words)))
words.clear()
finally:
run = False
if __name__ == "__main__":
main()