You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							114 lines
						
					
					
						
							3.6 KiB
						
					
					
				
			
		
		
	
	
							114 lines
						
					
					
						
							3.6 KiB
						
					
					
				#
 | 
						|
# (c) 2017 Aleph Objects, Inc.
 | 
						|
#
 | 
						|
# The code in this page is free software: you can
 | 
						|
# redistribute it and/or modify it under the terms of the GNU
 | 
						|
# General Public License (GNU GPL) as published by the Free Software
 | 
						|
# Foundation, either version 3 of the License, or (at your option)
 | 
						|
# any later version.  The code is distributed WITHOUT ANY WARRANTY;
 | 
						|
# without even the implied warranty of MERCHANTABILITY or FITNESS
 | 
						|
# FOR A PARTICULAR PURPOSE.  See the GNU GPL for more details.
 | 
						|
#
 | 
						|
 | 
						|
import functools
 | 
						|
import random
 | 
						|
import string
 | 
						|
import re
 | 
						|
 | 
						|
class FakeMarlinSerialDevice:
 | 
						|
  """This serial class simply pretends to be Marlin by acknowledging
 | 
						|
     commands with "ok" and requesting commands to be resent if they
 | 
						|
     contain errors"""
 | 
						|
 | 
						|
  def __init__(self):
 | 
						|
    self.line           = 1
 | 
						|
    self.replies        = []
 | 
						|
    self.pendingOk      = 0
 | 
						|
    self.dropCharacters = 0
 | 
						|
 | 
						|
    self.cumulativeReads     = 0
 | 
						|
    self.cumulativeWrites    = 0
 | 
						|
    self.cumulativeQueueSize = 0
 | 
						|
    self.cumulativeErrors    = 0
 | 
						|
 | 
						|
  def _enqueue_reply(self, str):
 | 
						|
    if str.startswith("ok"):
 | 
						|
      self.pendingOk += 1
 | 
						|
    self.replies.append(str + '\n')
 | 
						|
 | 
						|
  def _dequeue_reply(self):
 | 
						|
    if len(self.replies):
 | 
						|
      reply = self.replies.pop(0)
 | 
						|
      if reply.startswith("ok"):
 | 
						|
        self.pendingOk -= 1
 | 
						|
      return reply
 | 
						|
    else:
 | 
						|
      return ''
 | 
						|
 | 
						|
  def _enqueue_okay(self):
 | 
						|
    self._enqueue_reply("ok T:10")
 | 
						|
 | 
						|
  def _computeChecksum(self, data):
 | 
						|
    """Computes the GCODE checksum, this is the XOR of all characters in the payload, including the position"""
 | 
						|
    return functools.reduce(lambda x,y: x^y, map(ord, data) if isinstance(data, str) else list(data))
 | 
						|
 | 
						|
  def write(self, data):
 | 
						|
    if isinstance(data, str):
 | 
						|
      data = data.encode()
 | 
						|
 | 
						|
    if data.strip() == b"":
 | 
						|
      return
 | 
						|
 | 
						|
    if self.dropCharacters:
 | 
						|
      data = data[self.dropCharacters:]
 | 
						|
      self.dropCharacters = 0
 | 
						|
 | 
						|
    hasLineNumber = b"N" in data
 | 
						|
    hasChecksum   = b"*" in data
 | 
						|
 | 
						|
    if not hasLineNumber and not hasChecksum:
 | 
						|
      self._enqueue_okay()
 | 
						|
      return
 | 
						|
 | 
						|
    # Handle M110 commands which tell Marlin to reset the line counter
 | 
						|
    m = re.match(b'N(\d+)M110\*(\d+)$', data)
 | 
						|
    if m:
 | 
						|
      self.line = int(m.group(1))
 | 
						|
 | 
						|
    m = re.match(b'N(\d+)(\D[^*]*)\*(\d+)$', data)
 | 
						|
    if m and int(m.group(1)) == self.line and self._computeChecksum(b"N%d%s" % (self.line, m.group(2))) == int(m.group(3)):
 | 
						|
      # We have a valid, properly sequenced command with a valid checksum
 | 
						|
      self.line += 1
 | 
						|
    else:
 | 
						|
      # Otherwise, request the command be resent
 | 
						|
      self.cumulativeErrors += 1
 | 
						|
      self.replies   = []
 | 
						|
      self.pendingOk = 0
 | 
						|
      self._enqueue_reply("Resend: " + str(self.line))
 | 
						|
      # When Marlin issues a resend, it often misses the next few
 | 
						|
      # characters. So simulate this here.
 | 
						|
      self.dropCharacters = random.randint(0, 5)
 | 
						|
 | 
						|
    for i in range(0,random.randint(0,4)):
 | 
						|
      # Simulate a command that takes a while to execute
 | 
						|
      self._enqueue_reply("")
 | 
						|
    self._enqueue_okay()
 | 
						|
 | 
						|
    self.cumulativeWrites     += 1
 | 
						|
    self.cumulativeQueueSize  += self.pendingOk
 | 
						|
 | 
						|
  def readline(self):
 | 
						|
    self.cumulativeReads      += 1
 | 
						|
    return self._dequeue_reply().encode()
 | 
						|
 | 
						|
  def flush(self):
 | 
						|
    pass
 | 
						|
 | 
						|
  def close(self):
 | 
						|
    print("Average length of commands queue: %.2f" % (float(self.cumulativeQueueSize) / self.cumulativeWrites))
 | 
						|
    print("Average reads per write:          %.2f" % (float(self.cumulativeReads)     / self.cumulativeWrites))
 | 
						|
    print("Average errors per write:         %.2f" % (float(self.cumulativeErrors)    / self.cumulativeWrites))
 | 
						|
    print("Total writes:                     %d"   % self.cumulativeWrites)
 | 
						|
    print("Total errors:                     %d"   % self.cumulativeErrors)
 | 
						|
    print()
 |