You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
3.6 KiB

#
# (c) 2017 Aleph Objects, Inc.
#
# The code in this page is free software: you can
# redistribute it and/or modify it under the terms of the GNU
# General Public License (GNU GPL) as published by the Free Software
# Foundation, either version 3 of the License, or (at your option)
# any later version. The code is distributed WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU GPL for more details.
#
import functools
import random
import string
import re
class FakeMarlinSerialDevice:
"""This serial class simply pretends to be Marlin by acknowledging
commands with "ok" and requesting commands to be resent if they
contain errors"""
def __init__(self):
self.line = 1
self.replies = []
self.pendingOk = 0
self.dropCharacters = 0
self.cumulativeReads = 0
self.cumulativeWrites = 0
self.cumulativeQueueSize = 0
self.cumulativeErrors = 0
def _enqueue_reply(self, str):
if str.startswith("ok"):
self.pendingOk += 1
self.replies.append(str + '\n')
def _dequeue_reply(self):
if len(self.replies):
reply = self.replies.pop(0)
if reply.startswith("ok"):
self.pendingOk -= 1
return reply
else:
return ''
def _enqueue_okay(self):
self._enqueue_reply("ok T:10")
def _computeChecksum(self, data):
"""Computes the GCODE checksum, this is the XOR of all characters in the payload, including the position"""
return functools.reduce(lambda x,y: x^y, map(ord, data) if isinstance(data, str) else list(data))
def write(self, data):
if isinstance(data, str):
data = data.encode()
if data.strip() == b"":
return
if self.dropCharacters:
data = data[self.dropCharacters:]
self.dropCharacters = 0
hasLineNumber = b"N" in data
hasChecksum = b"*" in data
if not hasLineNumber and not hasChecksum:
self._enqueue_okay()
return
# Handle M110 commands which tell Marlin to reset the line counter
m = re.match(b'N(\d+)M110\*(\d+)$', data)
if m:
self.line = int(m.group(1))
m = re.match(b'N(\d+)(\D[^*]*)\*(\d+)$', data)
if m and int(m.group(1)) == self.line and self._computeChecksum(b"N%d%s" % (self.line, m.group(2))) == int(m.group(3)):
# We have a valid, properly sequenced command with a valid checksum
self.line += 1
else:
# Otherwise, request the command be resent
self.cumulativeErrors += 1
self.replies = []
self.pendingOk = 0
self._enqueue_reply("Resend: " + str(self.line))
# When Marlin issues a resend, it often misses the next few
# characters. So simulate this here.
self.dropCharacters = random.randint(0, 5)
for i in range(0,random.randint(0,4)):
# Simulate a command that takes a while to execute
self._enqueue_reply("")
self._enqueue_okay()
self.cumulativeWrites += 1
self.cumulativeQueueSize += self.pendingOk
def readline(self):
self.cumulativeReads += 1
return self._dequeue_reply().encode()
def flush(self):
pass
def close(self):
print("Average length of commands queue: %.2f" % (float(self.cumulativeQueueSize) / self.cumulativeWrites))
print("Average reads per write: %.2f" % (float(self.cumulativeReads) / self.cumulativeWrites))
print("Average errors per write: %.2f" % (float(self.cumulativeErrors) / self.cumulativeWrites))
print("Total writes: %d" % self.cumulativeWrites)
print("Total errors: %d" % self.cumulativeErrors)
print()