From 96e07849457099388fba5664bd898c2749f4b8ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=8B=87?= Date: Tue, 20 Jul 2021 22:46:21 +0800 Subject: [PATCH] add upload and uuid lua file --- file_upload/redis.lua | 676 +++++++++++++++++++++++++++++++++++++++++ file_upload/upload.lua | 267 ++++++++++++++++ file_upload/uuid.lua | 103 +++++++ 3 files changed, 1046 insertions(+) create mode 100644 file_upload/redis.lua create mode 100644 file_upload/upload.lua create mode 100644 file_upload/uuid.lua diff --git a/file_upload/redis.lua b/file_upload/redis.lua new file mode 100644 index 0000000..79f9b89 --- /dev/null +++ b/file_upload/redis.lua @@ -0,0 +1,676 @@ +-- Copyright (C) Yichun Zhang (agentzh) + + +local sub = string.sub +local byte = string.byte +local tab_insert = table.insert +local tab_remove = table.remove +local tcp = ngx.socket.tcp +local null = ngx.null +local ipairs = ipairs +local type = type +local pairs = pairs +local unpack = unpack +local setmetatable = setmetatable +local tonumber = tonumber +local tostring = tostring +local rawget = rawget +local select = select +--local error = error + + +local ok, new_tab = pcall(require, "table.new") +if not ok or type(new_tab) ~= "function" then + new_tab = function (narr, nrec) return {} end +end + + +local _M = new_tab(0, 55) + +_M._VERSION = '0.29' + + +local common_cmds = { + "get", "set", "mget", "mset", + "del", "incr", "decr", -- Strings + "llen", "lindex", "lpop", "lpush", + "lrange", "linsert", -- Lists + "hexists", "hget", "hset", "hmget", + --[[ "hmset", ]] "hdel", -- Hashes + "smembers", "sismember", "sadd", "srem", + "sdiff", "sinter", "sunion", -- Sets + "zrange", "zrangebyscore", "zrank", "zadd", + "zrem", "zincrby", -- Sorted Sets + "auth", "eval", "expire", "script", + "sort" -- Others +} + + +local sub_commands = { + "subscribe", "psubscribe" +} + + +local unsub_commands = { + "unsubscribe", "punsubscribe" +} + + +local mt = { __index = _M } + + +function _M.new(self) + local sock, err = tcp() + if not sock then + return nil, err + end + return setmetatable({ _sock = sock, + _subscribed = false, + _n_channel = { + unsubscribe = 0, + punsubscribe = 0, + }, + }, mt) +end + + +function _M.set_timeout(self, timeout) + local sock = rawget(self, "_sock") + if not sock then + error("not initialized", 2) + return + end + + sock:settimeout(timeout) +end + + +function _M.set_timeouts(self, connect_timeout, send_timeout, read_timeout) + local sock = rawget(self, "_sock") + if not sock then + error("not initialized", 2) + return + end + + sock:settimeouts(connect_timeout, send_timeout, read_timeout) +end + + +function _M.connect(self, host, port_or_opts, opts) + local sock = rawget(self, "_sock") + if not sock then + return nil, "not initialized" + end + + local unix + + do + local typ = type(host) + if typ ~= "string" then + error("bad argument #1 host: string expected, got " .. typ, 2) + end + + if sub(host, 1, 5) == "unix:" then + unix = true + end + + if unix then + typ = type(port_or_opts) + if port_or_opts ~= nil and typ ~= "table" then + error("bad argument #2 opts: nil or table expected, got " .. + typ, 2) + end + + else + typ = type(port_or_opts) + if typ ~= "number" then + port_or_opts = tonumber(port_or_opts) + if port_or_opts == nil then + error("bad argument #2 port: number expected, got " .. + typ, 2) + end + end + + if opts ~= nil then + typ = type(opts) + if typ ~= "table" then + error("bad argument #3 opts: nil or table expected, got " .. + typ, 2) + end + end + end + + end + + self._subscribed = false + + local ok, err + + if unix then + -- second argument of sock:connect() cannot be nil + if port_or_opts ~= nil then + ok, err = sock:connect(host, port_or_opts) + opts = port_or_opts + else + ok, err = sock:connect(host) + end + else + ok, err = sock:connect(host, port_or_opts, opts) + end + + if not ok then + return ok, err + end + + if opts and opts.ssl then + ok, err = sock:sslhandshake(false, opts.server_name, opts.ssl_verify) + if not ok then + return ok, "failed to do ssl handshake: " .. err + end + end + + return ok, err +end + + +function _M.set_keepalive(self, ...) + local sock = rawget(self, "_sock") + if not sock then + return nil, "not initialized" + end + + if rawget(self, "_subscribed") then + return nil, "subscribed state" + end + + return sock:setkeepalive(...) +end + + +function _M.get_reused_times(self) + local sock = rawget(self, "_sock") + if not sock then + return nil, "not initialized" + end + + return sock:getreusedtimes() +end + + +local function close(self) + local sock = rawget(self, "_sock") + if not sock then + return nil, "not initialized" + end + + return sock:close() +end +_M.close = close + + +local function _read_reply(self, sock) + local line, err = sock:receive() + if not line then + if err == "timeout" and not rawget(self, "_subscribed") then + sock:close() + end + return nil, err + end + + local prefix = byte(line) + + if prefix == 36 then -- char '$' + -- print("bulk reply") + + local size = tonumber(sub(line, 2)) + if size < 0 then + return null + end + + local data, err = sock:receive(size) + if not data then + if err == "timeout" then + sock:close() + end + return nil, err + end + + local dummy, err = sock:receive(2) -- ignore CRLF + if not dummy then + if err == "timeout" then + sock:close() + end + return nil, err + end + + return data + + elseif prefix == 43 then -- char '+' + -- print("status reply") + + return sub(line, 2) + + elseif prefix == 42 then -- char '*' + local n = tonumber(sub(line, 2)) + + -- print("multi-bulk reply: ", n) + if n < 0 then + return null + end + + local vals = new_tab(n, 0) + local nvals = 0 + for i = 1, n do + local res, err = _read_reply(self, sock) + if res then + nvals = nvals + 1 + vals[nvals] = res + + elseif res == nil then + return nil, err + + else + -- be a valid redis error value + nvals = nvals + 1 + vals[nvals] = {false, err} + end + end + + return vals + + elseif prefix == 58 then -- char ':' + -- print("integer reply") + return tonumber(sub(line, 2)) + + elseif prefix == 45 then -- char '-' + -- print("error reply: ", n) + + return false, sub(line, 2) + + else + -- when `line` is an empty string, `prefix` will be equal to nil. + return nil, "unknown prefix: \"" .. tostring(prefix) .. "\"" + end +end + + +local function _gen_req(args) + local nargs = #args + + local req = new_tab(nargs * 5 + 1, 0) + req[1] = "*" .. nargs .. "\r\n" + local nbits = 2 + + for i = 1, nargs do + local arg = args[i] + if type(arg) ~= "string" then + arg = tostring(arg) + end + + req[nbits] = "$" + req[nbits + 1] = #arg + req[nbits + 2] = "\r\n" + req[nbits + 3] = arg + req[nbits + 4] = "\r\n" + + nbits = nbits + 5 + end + + -- it is much faster to do string concatenation on the C land + -- in real world (large number of strings in the Lua VM) + return req +end + + +local function _check_msg(self, res) + return rawget(self, "_subscribed") and + type(res) == "table" and (res[1] == "message" or res[1] == "pmessage") +end + + +local function _do_cmd(self, ...) + local args = {...} + + local sock = rawget(self, "_sock") + if not sock then + return nil, "not initialized" + end + + local req = _gen_req(args) + + local reqs = rawget(self, "_reqs") + if reqs then + reqs[#reqs + 1] = req + return + end + + -- print("request: ", table.concat(req)) + + local bytes, err = sock:send(req) + if not bytes then + return nil, err + end + + local res, err = _read_reply(self, sock) + while _check_msg(self, res) do + if rawget(self, "_buffered_msg") == nil then + self._buffered_msg = new_tab(1, 0) + end + + tab_insert(self._buffered_msg, res) + res, err = _read_reply(self, sock) + end + + return res, err +end + + +local function _check_unsubscribed(self, res) + if type(res) == "table" + and (res[1] == "unsubscribe" or res[1] == "punsubscribe") + then + self._n_channel[res[1]] = self._n_channel[res[1]] - 1 + + local buffered_msg = rawget(self, "_buffered_msg") + if buffered_msg then + -- remove messages of unsubscribed channel + local msg_type = + (res[1] == "punsubscribe") and "pmessage" or "message" + local j = 1 + for _, msg in ipairs(buffered_msg) do + if msg[1] == msg_type and msg[2] ~= res[2] then + -- move messages to overwrite the removed ones + buffered_msg[j] = msg + j = j + 1 + end + end + + -- clear remain messages + for i = j, #buffered_msg do + buffered_msg[i] = nil + end + + if #buffered_msg == 0 then + self._buffered_msg = nil + end + end + + if res[3] == 0 then + -- all channels are unsubscribed + self._subscribed = false + end + end +end + + +local function _check_subscribed(self, res) + if type(res) == "table" + and (res[1] == "subscribe" or res[1] == "psubscribe") + then + if res[1] == "subscribe" then + self._n_channel.unsubscribe = self._n_channel.unsubscribe + 1 + + elseif res[1] == "psubscribe" then + self._n_channel.punsubscribe = self._n_channel.punsubscribe + 1 + end + end +end + + +function _M.read_reply(self) + local sock = rawget(self, "_sock") + if not sock then + return nil, "not initialized" + end + + if not rawget(self, "_subscribed") then + return nil, "not subscribed" + end + + local buffered_msg = rawget(self, "_buffered_msg") + if buffered_msg then + local msg = buffered_msg[1] + tab_remove(buffered_msg, 1) + + if #buffered_msg == 0 then + self._buffered_msg = nil + end + + return msg + end + + local res, err = _read_reply(self, sock) + _check_unsubscribed(self, res) + + return res, err +end + + +for i = 1, #common_cmds do + local cmd = common_cmds[i] + + _M[cmd] = + function (self, ...) + return _do_cmd(self, cmd, ...) + end +end + + +local function handle_subscribe_result(self, cmd, nargs, res) + local err + _check_subscribed(self, res) + + if nargs <= 1 then + return res + end + + local results = new_tab(nargs, 0) + results[1] = res + local sock = rawget(self, "_sock") + + for i = 2, nargs do + res, err = _read_reply(self, sock) + if not res then + return nil, err + end + + _check_subscribed(self, res) + results[i] = res + end + + return results +end + +for i = 1, #sub_commands do + local cmd = sub_commands[i] + + _M[cmd] = + function (self, ...) + if not rawget(self, "_subscribed") then + self._subscribed = true + end + + local nargs = select("#", ...) + + local res, err = _do_cmd(self, cmd, ...) + if not res then + return nil, err + end + + return handle_subscribe_result(self, cmd, nargs, res) + end +end + + +local function handle_unsubscribe_result(self, cmd, nargs, res) + local err + _check_unsubscribed(self, res) + + if self._n_channel[cmd] == 0 or nargs == 1 then + return res + end + + local results = new_tab(nargs, 0) + results[1] = res + local sock = rawget(self, "_sock") + local i = 2 + + while nargs == 0 or i <= nargs do + res, err = _read_reply(self, sock) + if not res then + return nil, err + end + + results[i] = res + i = i + 1 + + _check_unsubscribed(self, res) + if self._n_channel[cmd] == 0 then + -- exit the loop for unsubscribe() call + break + end + end + + return results +end + +for i = 1, #unsub_commands do + local cmd = unsub_commands[i] + + _M[cmd] = + function (self, ...) + -- assume all channels are unsubscribed by only one time + if not rawget(self, "_subscribed") then + return nil, "not subscribed" + end + + local nargs = select("#", ...) + + local res, err = _do_cmd(self, cmd, ...) + if not res then + return nil, err + end + + return handle_unsubscribe_result(self, cmd, nargs, res) + end +end + + +function _M.hmset(self, hashname, ...) + if select('#', ...) == 1 then + local t = select(1, ...) + + local n = 0 + for k, v in pairs(t) do + n = n + 2 + end + + local array = new_tab(n, 0) + + local i = 0 + for k, v in pairs(t) do + array[i + 1] = k + array[i + 2] = v + i = i + 2 + end + -- print("key", hashname) + return _do_cmd(self, "hmset", hashname, unpack(array)) + end + + -- backwards compatibility + return _do_cmd(self, "hmset", hashname, ...) +end + + +function _M.init_pipeline(self, n) + self._reqs = new_tab(n or 4, 0) +end + + +function _M.cancel_pipeline(self) + self._reqs = nil +end + + +function _M.commit_pipeline(self) + local reqs = rawget(self, "_reqs") + if not reqs then + return nil, "no pipeline" + end + + self._reqs = nil + + local sock = rawget(self, "_sock") + if not sock then + return nil, "not initialized" + end + + local bytes, err = sock:send(reqs) + if not bytes then + return nil, err + end + + local nvals = 0 + local nreqs = #reqs + local vals = new_tab(nreqs, 0) + for i = 1, nreqs do + local res, err = _read_reply(self, sock) + if res then + nvals = nvals + 1 + vals[nvals] = res + + elseif res == nil then + if err == "timeout" then + close(self) + end + return nil, err + + else + -- be a valid redis error value + nvals = nvals + 1 + vals[nvals] = {false, err} + end + end + + return vals +end + + +function _M.array_to_hash(self, t) + local n = #t + -- print("n = ", n) + local h = new_tab(0, n / 2) + for i = 1, n, 2 do + h[t[i]] = t[i + 1] + end + return h +end + + +-- this method is deperate since we already do lazy method generation. +function _M.add_commands(...) + local cmds = {...} + for i = 1, #cmds do + local cmd = cmds[i] + _M[cmd] = + function (self, ...) + return _do_cmd(self, cmd, ...) + end + end +end + + +setmetatable(_M, {__index = function(self, cmd) + local method = + function (self, ...) + return _do_cmd(self, cmd, ...) + end + + -- cache the lazily generated method in our + -- module table + _M[cmd] = method + return method +end}) + + +return _M diff --git a/file_upload/upload.lua b/file_upload/upload.lua new file mode 100644 index 0000000..b1d7aee --- /dev/null +++ b/file_upload/upload.lua @@ -0,0 +1,267 @@ +-- Copyright (C) Yichun Zhang (agentzh) + + +-- local sub = string.sub +local req_socket = ngx.req.socket +local match = string.match +local setmetatable = setmetatable +local type = type +local ngx_var = ngx.var +-- local print = print + + +local _M = { _VERSION = '0.10' } + + +local CHUNK_SIZE = 4096 +local MAX_LINE_SIZE = 512 + +local STATE_BEGIN = 1 +local STATE_READING_HEADER = 2 +local STATE_READING_BODY = 3 +local STATE_EOF = 4 + + +local mt = { __index = _M } + +local state_handlers + + +local function get_boundary() + local header = ngx_var.content_type + if not header then + return nil + end + + if type(header) == "table" then + header = header[1] + end + + local m = match(header, ";%s*boundary=\"([^\"]+)\"") + if m then + return m + end + + return match(header, ";%s*boundary=([^\",;]+)") +end + + +function _M.new(self, chunk_size, max_line_size) + local boundary = get_boundary() + + -- print("boundary: ", boundary) + + if not boundary then + return nil, "no boundary defined in Content-Type" + end + + -- print('boundary: "', boundary, '"') + + local sock, err = req_socket() + if not sock then + return nil, err + end + + local read2boundary, err = sock:receiveuntil("--" .. boundary) + if not read2boundary then + return nil, err + end + + local read_line, err = sock:receiveuntil("\r\n") + if not read_line then + return nil, err + end + + return setmetatable({ + sock = sock, + size = chunk_size or CHUNK_SIZE, + line_size = max_line_size or MAX_LINE_SIZE, + read2boundary = read2boundary, + read_line = read_line, + boundary = boundary, + state = STATE_BEGIN + }, mt) +end + + +function _M.set_timeout(self, timeout) + local sock = self.sock + if not sock then + return nil, "not initialized" + end + + return sock:settimeout(timeout) +end + + +local function discard_line(self) + local read_line = self.read_line + + local line, err = read_line(self.line_size) + if not line then + return nil, err + end + + local dummy, err = read_line(1) + if dummy then + return nil, "line too long: " .. line .. dummy .. "..." + end + + if err then + return nil, err + end + + return 1 +end + + +local function discard_rest(self) + local sock = self.sock + local size = self.size + + while true do + local dummy, err = sock:receive(size) + if err and err ~= 'closed' then + return nil, err + end + + if not dummy then + return 1 + end + end +end + + +local function read_body_part(self) + local read2boundary = self.read2boundary + + local chunk, err = read2boundary(self.size) + if err then + return nil, nil, err + end + + if not chunk then + local sock = self.sock + + local data = sock:receive(2) + if data == "--" then + local ok, err = discard_rest(self) + if not ok then + return nil, nil, err + end + + self.state = STATE_EOF + return "part_end" + end + + if data ~= "\r\n" then + local ok, err = discard_line(self) + if not ok then + return nil, nil, err + end + end + + self.state = STATE_READING_HEADER + return "part_end" + end + + return "body", chunk +end + + +local function read_header(self) + local read_line = self.read_line + + local line, err = read_line(self.line_size) + if err then + return nil, nil, err + end + + local dummy, err = read_line(1) + if dummy then + return nil, nil, "line too long: " .. line .. dummy .. "..." + end + + if err then + return nil, nil, err + end + + -- print("read line: ", line) + + if line == "" then + -- after the last header + self.state = STATE_READING_BODY + return read_body_part(self) + end + + local key, value = match(line, "([^: \t]+)%s*:%s*(.+)") + if not key then + return 'header', line + end + + return 'header', {key, value, line} +end + + +local function eof() + return "eof", nil +end + + +function _M.read(self) + -- local size = self.size + + local handler = state_handlers[self.state] + if handler then + return handler(self) + end + + return nil, nil, "bad state: " .. self.state +end + + +local function read_preamble(self) + local sock = self.sock + if not sock then + return nil, nil, "not initialized" + end + + local size = self.size + local read2boundary = self.read2boundary + + while true do + local preamble = read2boundary(size) + if not preamble then + break + end + + -- discard the preamble data chunk + -- print("read preamble: ", preamble) + end + + local ok, err = discard_line(self) + if not ok then + return nil, nil, err + end + + local read2boundary, err = sock:receiveuntil("\r\n--" .. self.boundary) + if not read2boundary then + return nil, nil, err + end + + self.read2boundary = read2boundary + + self.state = STATE_READING_HEADER + return read_header(self) +end + + +state_handlers = { + read_preamble, + read_header, + read_body_part, + eof +} + + +return _M \ No newline at end of file diff --git a/file_upload/uuid.lua b/file_upload/uuid.lua new file mode 100644 index 0000000..c25e498 --- /dev/null +++ b/file_upload/uuid.lua @@ -0,0 +1,103 @@ +local ffi = require "ffi" +local ffi_new = ffi.new +local ffi_str = ffi.string +local ffi_load = ffi.load +local ffi_cdef = ffi.cdef +local C = ffi.C +local OSX = ffi.os == "OSX" +local pcall = pcall +local assert = assert +local tonumber = tonumber +local setmetatable = setmetatable + +ffi_cdef[[ +typedef unsigned char uuid_t[16]; +typedef long time_t; +typedef struct timeval { + time_t tv_sec; + time_t tv_usec; +} timeval; + void uuid_generate(uuid_t out); + void uuid_generate_random(uuid_t out); + void uuid_generate_time(uuid_t out); + int uuid_generate_time_safe(uuid_t out); + int uuid_parse(const char *in, uuid_t uu); + void uuid_unparse(const uuid_t uu, char *out); + int uuid_type(const uuid_t uu); + int uuid_variant(const uuid_t uu); + time_t uuid_time(const uuid_t uu, struct timeval *ret_tv); +]] + +local function L(n) + local ok, lib = pcall(ffi_load, n) + if ok then return lib end + ok, lib = pcall(ffi_load, n .. '.so.1') + assert(ok, lib) + return lib +end + +local lib = OSX and C or L "uuid" +local uid = ffi_new "uuid_t" +local tvl = ffi_new "timeval" +local buf = ffi_new("char[?]", 36) + +local uuid = {} +local mt = {} + +local function unparse(id) + lib.uuid_unparse(id, buf) + return ffi_str(buf, 36) +end + +local function parse(id) + return lib.uuid_parse(id, uid) == 0 and uid or nil +end + +function uuid.generate() + lib.uuid_generate(uid) + return unparse(uid) +end + +function uuid.generate_random() + lib.uuid_generate_random(uid) + return unparse(uid) +end + +function uuid.generate_time() + lib.uuid_generate_time(uid) + return unparse(uid) +end + +function uuid.generate_time_safe() + assert(not OSX, "uuid_generate_time_safe is not supported on OS X.") + local safe = lib.uuid_generate_time_safe(uid) == 0 + return unparse(uid), safe +end + +function uuid.type(id) + assert(not OSX, "uuid_type is not supported on OS X.") + local parsed = parse(id) + return parsed and lib.uuid_type(parsed) +end + +function uuid.variant(id) + assert(not OSX, "uuid_variant is not supported on OS X.") + local parsed = parse(id) + return parsed and lib.uuid_variant(parsed) +end + +function uuid.time(id) + local parsed = parse(id) + if parsed then + local secs = lib.uuid_time(parsed, tvl) + return tonumber(secs), tonumber(tvl.tv_usec) + end +end + +function uuid.is_valid(id) + return not not parse(id) +end + +mt.__call = uuid.generate + +return setmetatable(uuid, mt) \ No newline at end of file