-
Notifications
You must be signed in to change notification settings - Fork 128
Expand file tree
/
Copy pathio.pxi
More file actions
213 lines (183 loc) · 6.45 KB
/
io.pxi
File metadata and controls
213 lines (183 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
(ns pixie.io
(:require [pixie.streams :as st :refer :all]
[pixie.io-blocking :as io-blocking]
[pixie.uv :as uv]
[pixie.stacklets :as st]
[pixie.ffi :as ffi]
[pixie.ffi-infer :as ffi-infer]))
(uv/defuvfsfn fs_open [path flags mode] :result)
(uv/defuvfsfn fs_read [file bufs nbufs offset] :result)
(uv/defuvfsfn fs_write [file bufs nbufs offset] :result)
(uv/defuvfsfn fs_close [file] :result)
(def DEFAULT-BUFFER-SIZE 1024)
(deftype FileStream [fp offset uvbuf]
IInputStream
(read [this buffer len]
(assert (<= (buffer-capacity buffer) len)
"Not enough capacity in the buffer")
(let [_ (pixie.ffi/set! uvbuf :base buffer)
_ (pixie.ffi/set! uvbuf :len (buffer-capacity buffer))
read-count (fs_read fp uvbuf 1 offset)]
(assert (not (neg? read-count)) "Read Error")
(set-field! this :offset (+ offset read-count))
(set-buffer-count! buffer read-count)
read-count))
ISeekableStream
(position [this]
offset)
(rewind [this]
(set-field! this :offset 0))
(seek [this pos]
(set-field! this :offset pos))
IDisposable
(-dispose! [this]
(dispose! uvbuf)
(fs_close fp))
IReduce
(-reduce [this f init]
(let [buf (buffer DEFAULT-BUFFER-SIZE)
rrf (preserving-reduced f)]
(loop [acc init]
(let [read-count (read this buf DEFAULT-BUFFER-SIZE)]
(if (> read-count 0)
(let [result (reduce rrf acc buf)]
(if (not (reduced? result))
(recur result)
@result))
acc))))))
(defn open-read
{:doc "Open a file for reading, returning a IInputStream"
:added "0.1"}
[filename]
(assert (string? filename) "Filename must be a string")
(->FileStream (fs_open filename uv/O_RDONLY 0) 0 (uv/uv_buf_t)))
(defn read-line
"Read one line from input-stream for each invocation.
nil when all lines have been read"
[input-stream]
(let [line-feed (into #{} (map int [\newline \return]))
buf (buffer 1)]
(loop [acc []]
(let [len (read input-stream buf 1)]
(cond
(and (pos? len) (not (line-feed (first buf))))
(recur (conj acc (first buf)))
(and (zero? len) (empty? acc)) nil
:else (apply str (map char acc)))))))
(defn line-seq
"Returns the lines of text from input-stream as a lazy sequence of strings.
input-stream must implement IInputStream"
[input-stream]
(when-let [line (read-line input-stream)]
(cons line (lazy-seq (line-seq input-stream)))))
(deftype FileOutputStream [fp offset uvbuf]
IOutputStream
(write [this buffer]
(loop [buffer-offset 0]
(let [_ (pixie.ffi/set! uvbuf :base (ffi/ptr-add buffer buffer-offset))
_ (pixie.ffi/set! uvbuf :len (- (count buffer) buffer-offset))
write-count (fs_write fp uvbuf 1 offset)]
(when (neg? write-count)
(throw (uv/uv_err_name read-count)))
(set-field! this :offset (+ offset write-count))
(if (< (+ buffer-offset write-count) (count buffer))
(recur (+ buffer-offset write-count))
write-count))))
IDisposable
(-dispose! [this]
(fclose fp)))
(deftype BufferedOutputStream [downstream idx buffer]
IByteOutputStream
(write-byte [this val]
(pixie.ffi/pack! buffer idx CUInt8 val)
(set-field! this :idx (inc idx))
(when (= idx (buffer-capacity buffer))
(set-buffer-count! buffer (buffer-capacity buffer))
(write downstream buffer)
(set-field! this :idx 0)))
IDisposable
(-dispose! [this]
(set-buffer-count! buffer idx)
(write downstream buffer)))
(deftype BufferedInputStream [upstream idx buffer]
IByteInputStream
(read-byte [this]
(when (= idx (count buffer))
(set-field! this :idx 0)
(read upstream buffer (buffer-capacity buffer)))
(let [val (nth buffer idx)]
(set-field! this :idx (inc idx))
val))
IDisposable
(-dispose! [this]
(dispose! upstream)
(dispose! buffer)))
(defn buffered-output-stream [downstream size]
(->BufferedOutputStream downstream 0 (buffer size)))
(defn buffered-input-stream [upstream size]
(let [b (buffer size)]
(set-buffer-count! b size)
(->BufferedInputStream upstream size b)))
(defn throw-on-error [result]
(when (neg? result)
(throw (uv/uv_err_name result)))
result)
(defn open-write
{:doc "Open a file for reading, returning a IInputStream"
:added "0.1"}
[filename]
(assert (string? filename) "Filename must be a string")
(->FileOutputStream (throw-on-error (fs_open filename
(bit-or uv/O_WRONLY uv/O_CREAT)
uv/S_IRWXU))
0
(uv/uv_buf_t)))
(defn file-output-rf [filename]
(let [fp (buffered-output-stream (open-write filename)
DEFAULT-BUFFER-SIZE)]
(fn ([] 0)
([_] (dispose! fp))
([_ chr]
(assert (integer? chr))
(write-byte fp chr)
nil))))
(defn spit [filename val]
(transduce (map int)
(file-output-rf filename)
(str val)))
(defn slurp [filename]
(let [c (open-read filename)
result (transduce
(map char)
string-builder
c)]
(dispose! c)
result))
(defn run-command [command]
(st/apply-blocking io-blocking/run-command command))
(comment
(defn tcp-server [ip port on-connection]
(assert (string? ip) "Ip should be a string")
(assert (integer? port) "Port should be a int")
(let [server (uv/uv_tcp_t)
bind-addr (uv/sockaddr_in)
_ (uv/throw-on-error (uv/uv_ip4_addr ip port bind-addr))
on-new-connetion (atom nil)]
(reset! on-new-connetion
(ffi/ffi-prep-callback
uv/uv_connection_cb
(fn [server status]
(when (not (= status -1))
(println "Got Client!!!!!!!")))))
(uv/uv_tcp_init (uv/uv_default_loop) server)
(uv/uv_tcp_bind server bind-addr 0)
(uv/throw-on-error (uv/uv_listen server 128 @on-new-connetion))
(st/yield-control))))
(comment
(st/apply-blocking println "FROM OTHER THREAD <---!!!!!")
(tcp-server "0.0.0.0" 4242 nil))
(comment
(defmacro make-readline-async []
`(let [libname ~(ffi-infer/compile-library {:prefix "pixie.io.readline"
:includes ["uv.h" "editline/readline.h"]})]))
(ffi-infer/compile-library))