xfys's picture
Upload 432 files
f07f089
raw
history blame
12.3 kB
# cython: auto_pickle=False
#=======================================================================
#
# Python Lexical Analyser
#
#
# Scanning an input stream
#
#=======================================================================
from __future__ import absolute_import
import cython
cython.declare(BOL=object, EOL=object, EOF=object, NOT_FOUND=object)
from . import Errors
from .Regexps import BOL, EOL, EOF
NOT_FOUND = object()
class Scanner(object):
"""
A Scanner is used to read tokens from a stream of characters
using the token set specified by a Plex.Lexicon.
Constructor:
Scanner(lexicon, stream, name = '')
See the docstring of the __init__ method for details.
Methods:
See the docstrings of the individual methods for more
information.
read() --> (value, text)
Reads the next lexical token from the stream.
position() --> (name, line, col)
Returns the position of the last token read using the
read() method.
begin(state_name)
Causes scanner to change state.
produce(value [, text])
Causes return of a token value to the caller of the
Scanner.
"""
# lexicon = None # Lexicon
# stream = None # file-like object
# name = ''
# buffer = ''
# buf_start_pos = 0 # position in input of start of buffer
# next_pos = 0 # position in input of next char to read
# cur_pos = 0 # position in input of current char
# cur_line = 1 # line number of current char
# cur_line_start = 0 # position in input of start of current line
# start_pos = 0 # position in input of start of token
# start_line = 0 # line number of start of token
# start_col = 0 # position in line of start of token
# text = None # text of last token read
# initial_state = None # Node
# state_name = '' # Name of initial state
# queue = None # list of tokens to be returned
# trace = 0
def __init__(self, lexicon, stream, name='', initial_pos=None):
"""
Scanner(lexicon, stream, name = '')
|lexicon| is a Plex.Lexicon instance specifying the lexical tokens
to be recognised.
|stream| can be a file object or anything which implements a
compatible read() method.
|name| is optional, and may be the name of the file being
scanned or any other identifying string.
"""
self.trace = 0
self.buffer = u''
self.buf_start_pos = 0
self.next_pos = 0
self.cur_pos = 0
self.cur_line = 1
self.start_pos = 0
self.start_line = 0
self.start_col = 0
self.text = None
self.state_name = None
self.lexicon = lexicon
self.stream = stream
self.name = name
self.queue = []
self.initial_state = None
self.begin('')
self.next_pos = 0
self.cur_pos = 0
self.cur_line_start = 0
self.cur_char = BOL
self.input_state = 1
if initial_pos is not None:
self.cur_line, self.cur_line_start = initial_pos[1], -initial_pos[2]
def read(self):
"""
Read the next lexical token from the stream and return a
tuple (value, text), where |value| is the value associated with
the token as specified by the Lexicon, and |text| is the actual
string read from the stream. Returns (None, '') on end of file.
"""
queue = self.queue
while not queue:
self.text, action = self.scan_a_token()
if action is None:
self.produce(None)
self.eof()
else:
value = action.perform(self, self.text)
if value is not None:
self.produce(value)
result = queue[0]
del queue[0]
return result
def scan_a_token(self):
"""
Read the next input sequence recognised by the machine
and return (text, action). Returns ('', None) on end of
file.
"""
self.start_pos = self.cur_pos
self.start_line = self.cur_line
self.start_col = self.cur_pos - self.cur_line_start
action = self.run_machine_inlined()
if action is not None:
if self.trace:
print("Scanner: read: Performing %s %d:%d" % (
action, self.start_pos, self.cur_pos))
text = self.buffer[
self.start_pos - self.buf_start_pos:
self.cur_pos - self.buf_start_pos]
return (text, action)
else:
if self.cur_pos == self.start_pos:
if self.cur_char is EOL:
self.next_char()
if self.cur_char is None or self.cur_char is EOF:
return (u'', None)
raise Errors.UnrecognizedInput(self, self.state_name)
def run_machine_inlined(self):
"""
Inlined version of run_machine for speed.
"""
state = self.initial_state
cur_pos = self.cur_pos
cur_line = self.cur_line
cur_line_start = self.cur_line_start
cur_char = self.cur_char
input_state = self.input_state
next_pos = self.next_pos
buffer = self.buffer
buf_start_pos = self.buf_start_pos
buf_len = len(buffer)
b_action, b_cur_pos, b_cur_line, b_cur_line_start, b_cur_char, b_input_state, b_next_pos = \
None, 0, 0, 0, u'', 0, 0
trace = self.trace
while 1:
if trace: #TRACE#
print("State %d, %d/%d:%s -->" % ( #TRACE#
state['number'], input_state, cur_pos, repr(cur_char))) #TRACE#
# Begin inlined self.save_for_backup()
#action = state.action #@slow
action = state['action'] #@fast
if action is not None:
b_action, b_cur_pos, b_cur_line, b_cur_line_start, b_cur_char, b_input_state, b_next_pos = \
action, cur_pos, cur_line, cur_line_start, cur_char, input_state, next_pos
# End inlined self.save_for_backup()
c = cur_char
#new_state = state.new_state(c) #@slow
new_state = state.get(c, NOT_FOUND) #@fast
if new_state is NOT_FOUND: #@fast
new_state = c and state.get('else') #@fast
if new_state:
if trace: #TRACE#
print("State %d" % new_state['number']) #TRACE#
state = new_state
# Begin inlined: self.next_char()
if input_state == 1:
cur_pos = next_pos
# Begin inlined: c = self.read_char()
buf_index = next_pos - buf_start_pos
if buf_index < buf_len:
c = buffer[buf_index]
next_pos += 1
else:
discard = self.start_pos - buf_start_pos
data = self.stream.read(0x1000)
buffer = self.buffer[discard:] + data
self.buffer = buffer
buf_start_pos += discard
self.buf_start_pos = buf_start_pos
buf_len = len(buffer)
buf_index -= discard
if data:
c = buffer[buf_index]
next_pos += 1
else:
c = u''
# End inlined: c = self.read_char()
if c == u'\n':
cur_char = EOL
input_state = 2
elif not c:
cur_char = EOL
input_state = 4
else:
cur_char = c
elif input_state == 2:
cur_char = u'\n'
input_state = 3
elif input_state == 3:
cur_line += 1
cur_line_start = cur_pos = next_pos
cur_char = BOL
input_state = 1
elif input_state == 4:
cur_char = EOF
input_state = 5
else: # input_state = 5
cur_char = u''
# End inlined self.next_char()
else: # not new_state
if trace: #TRACE#
print("blocked") #TRACE#
# Begin inlined: action = self.back_up()
if b_action is not None:
(action, cur_pos, cur_line, cur_line_start,
cur_char, input_state, next_pos) = \
(b_action, b_cur_pos, b_cur_line, b_cur_line_start,
b_cur_char, b_input_state, b_next_pos)
else:
action = None
break # while 1
# End inlined: action = self.back_up()
self.cur_pos = cur_pos
self.cur_line = cur_line
self.cur_line_start = cur_line_start
self.cur_char = cur_char
self.input_state = input_state
self.next_pos = next_pos
if trace: #TRACE#
if action is not None: #TRACE#
print("Doing %s" % action) #TRACE#
return action
def next_char(self):
input_state = self.input_state
if self.trace:
print("Scanner: next: %s [%d] %d" % (" " * 20, input_state, self.cur_pos))
if input_state == 1:
self.cur_pos = self.next_pos
c = self.read_char()
if c == u'\n':
self.cur_char = EOL
self.input_state = 2
elif not c:
self.cur_char = EOL
self.input_state = 4
else:
self.cur_char = c
elif input_state == 2:
self.cur_char = u'\n'
self.input_state = 3
elif input_state == 3:
self.cur_line += 1
self.cur_line_start = self.cur_pos = self.next_pos
self.cur_char = BOL
self.input_state = 1
elif input_state == 4:
self.cur_char = EOF
self.input_state = 5
else: # input_state = 5
self.cur_char = u''
if self.trace:
print("--> [%d] %d %r" % (input_state, self.cur_pos, self.cur_char))
def position(self):
"""
Return a tuple (name, line, col) representing the location of
the last token read using the read() method. |name| is the
name that was provided to the Scanner constructor; |line|
is the line number in the stream (1-based); |col| is the
position within the line of the first character of the token
(0-based).
"""
return (self.name, self.start_line, self.start_col)
def get_position(self):
"""Python accessible wrapper around position(), only for error reporting.
"""
return self.position()
def begin(self, state_name):
"""Set the current state of the scanner to the named state."""
self.initial_state = (
self.lexicon.get_initial_state(state_name))
self.state_name = state_name
def produce(self, value, text=None):
"""
Called from an action procedure, causes |value| to be returned
as the token value from read(). If |text| is supplied, it is
returned in place of the scanned text.
produce() can be called more than once during a single call to an action
procedure, in which case the tokens are queued up and returned one
at a time by subsequent calls to read(), until the queue is empty,
whereupon scanning resumes.
"""
if text is None:
text = self.text
self.queue.append((value, text))
def eof(self):
"""
Override this method if you want something to be done at
end of file.
"""