Added initial binary decoder.
[coe.git] / coe / bin.py
CommitLineData
e8a122ff
FT
1from . import data
2
3T_END = 0
4T_INT = 1
5T_STR = 2
6T_BIT = 3
7T_NIL = 4
8T_SYM = 5
9T_CON = 6
10
11INT_REF = 1
12
13STR_SYM = 1
14
15BIT_BFLOAT = 1
16BIT_DFLOAT = 2
17
18CON_LIST = 0
19CON_SET = 1
20CON_MAP = 2
21CON_OBJ = 3
22
23NIL_FALSE = 1
24NIL_TRUE = 2
25
26class fmterror(Exception):
27 pass
28
29class eoferror(fmterror):
30 def __init__(self):
31 super().__init__("unexpected end-of-data")
32
33class referror(fmterror):
34 def __init__(self):
35 super().__init__("bad backref")
36
37class namedtype(type):
38 pass
39
40class decoder(object):
41 def __init__(self):
42 self.reftab = []
43 self.namedtypes = {}
44
45 @staticmethod
46 def byte(fp):
47 b = fp.read(1)
48 if b == b"":
49 raise eoferror()
50 return b[0]
51
52 @staticmethod
53 def loadint(fp):
54 ret = 0
55 p = 0
56 while True:
57 b = decoder.byte(fp)
58 ret += (b & 0x7f) << p
59 p += 7
60 if (b & 0x80) == 0:
61 break
62 if (b & 0x40) != 0:
63 ret = ret - (1 << p)
64 return ret
65
66 @staticmethod
67 def loadstr(fp):
68 buf = bytearray()
69 while True:
70 b = decoder.byte(fp)
71 if b == 0:
72 break
73 buf.append(b)
74 return buf.decode("utf-8")
75
76 def loadsym(self, fp):
77 h = self.byte(fp)
78 if h & 0x1:
79 nsref = self.loadint(fp)
80 if not 0 <= nsref < len(self.reftab):
81 raise fmterror("illegal namespace ref: " + str(nsref))
82 nssym = self.reftab[nsref]
83 if not isinstance(nssym, data.symbol):
84 raise fmterror("illegal namespace ref: " + str(nsref))
85 ns = nssym.ns
86 else:
87 ns = self.loadstr(fp)
88 nm = self.loadstr(fp)
89 ret = data.symbol.get(ns, nm)
90 return ret
91
92 def loadlist(self, fp, buf):
93 while True:
94 tag = self.byte(fp)
95 if tag == T_END:
96 return buf
97 buf.append(self.loadtagged(fp, tag))
98
99 def loadmap(self, fp, buf):
100 while True:
101 tag = self.byte(fp)
102 if tag == T_END:
103 return buf
104 key = self.loadtagged(fp, tag)
105 tag = self.byte(fp)
106 if tag == T_END:
107 return buf
108 buf[key] = self.loadtagged(fp, tag)
109
110 def loadobj(self, fp, ref=False):
111 if ref:
112 refid = len(self.reftab)
113 self.reftab.append(None)
114 nm = self.load(fp)
115 typ = self.namedtypes.get(nm)
116 if typ is None:
117 typ = self.namedtypes[nm] = namedtype(str(nm), (data.obj, object), {})
118 typ.typename = nm
119 ret = typ()
120 if ref:
121 self.reftab[refid] = ret
122 # st = fp.tell()
123 # print(">", nm, hex(st))
124 ret.__dict__.update(self.loadmap(fp, {}))
125 # print("<", nm, hex(fp.tell()), hex(st))
126 return ret
127
128 def addref(self, obj):
129 self.reftab.append(obj)
130 return obj
131
132 def loadtagged(self, fp, tag):
133 pri, sec = (tag & 0x7), (tag & 0xf8) >> 3
134 if pri == T_END:
135 raise fmterror("unexpected end-tag")
136 elif pri == T_INT:
137 if sec == INT_REF:
138 idx = self.loadint(fp)
139 if not 0 <= idx < len(self.reftab):
140 raise referror()
141 # print(idx, self.reftab[idx], hex(fp.tell()))
142 return self.reftab[idx]
143 return self.addref(self.loadint(fp))
144 elif pri == T_STR:
145 ret = self.addref(self.loadstr(fp))
146 if sec == STR_SYM:
147 return data.symbol.get("", ret)
148 return ret
149 elif pri == T_BIT:
150 ln = self.loadint(fp)
151 ret = self.addref(fp.read(ln))
152 if len(ret) < ln:
153 raise eoferror()
154 return ret
155 elif pri == T_NIL:
156 if sec == NIL_TRUE:
157 return self.addref(True)
158 elif sec == NIL_FALSE:
159 return self.addref(False)
160 return self.addref(None)
161 elif pri == T_SYM:
162 return self.addref(self.loadsym(fp))
163 elif pri == T_CON:
164 if sec == CON_MAP:
165 return self.loadmap(fp, self.addref({}))
166 elif sec == CON_OBJ:
167 return self.loadobj(fp, ref=True)
168 else:
169 return self.loadlist(fp, self.addref([]))
170 else:
171 raise fmterror("unknown primary: " + str(pri))
172
173 def load(self, fp):
174 tag = self.byte(fp)
175 return self.loadtagged(fp, tag)
176
177def load(fp):
178 decoder().load(fp)