mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-09-20 04:36:09 +08:00
Compare commits
1 Commits
renovate/g
...
renovate/g
Author | SHA1 | Date | |
---|---|---|---|
18c0f551fe |
56
.github/PULL_REQUEST_TEMPLATE.md
vendored
56
.github/PULL_REQUEST_TEMPLATE.md
vendored
@ -1,56 +0,0 @@
|
||||
<!--
|
||||
Provide a general summary of your changes in the Title above.
|
||||
The PR title must start with `feat(): `, `docs(): `, `fix(): `, `style(): `, or `refactor(): `, `chore(): `. For example: `feat(component): add new feature`.
|
||||
If it spans multiple components, use the main component as the prefix and enumerate in the title, describe in the body.
|
||||
-->
|
||||
<!--
|
||||
在上方标题中提供您更改的总体摘要。
|
||||
PR 标题需以 `feat(): `, `docs(): `, `fix(): `, `style(): `, `refactor(): `, `chore(): ` 其中之一开头,例如:`feat(component): 新增功能`。
|
||||
如果跨多个组件,请使用主要组件作为前缀,并在标题中枚举、描述中说明。
|
||||
-->
|
||||
|
||||
## Description / 描述
|
||||
|
||||
<!-- Describe your changes in detail -->
|
||||
<!-- 详细描述您的更改 -->
|
||||
|
||||
## Motivation and Context / 背景
|
||||
|
||||
<!-- Why is this change required? What problem does it solve? -->
|
||||
<!-- 为什么需要此更改?它解决了什么问题? -->
|
||||
|
||||
<!-- If it fixes an open issue, please link to the issue here. -->
|
||||
<!-- 如果修复了一个打开的issue,请在此处链接到该issue -->
|
||||
|
||||
Closes #XXXX
|
||||
|
||||
<!-- or -->
|
||||
<!-- 或者 -->
|
||||
|
||||
Relates to #XXXX
|
||||
|
||||
## How Has This Been Tested? / 测试
|
||||
|
||||
<!-- Please describe in detail how you tested your changes. -->
|
||||
<!-- 请详细描述您如何测试更改 -->
|
||||
|
||||
## Checklist / 检查清单
|
||||
|
||||
<!-- Go over all the following points, and put an `x` in all the boxes that apply. -->
|
||||
<!-- 检查以下所有要点,并在所有适用的框中打`x` -->
|
||||
|
||||
<!-- If you're unsure about any of these, don't hesitate to ask. We're here to help! -->
|
||||
<!-- 如果您对其中任何一项不确定,请不要犹豫提问。我们会帮助您! -->
|
||||
|
||||
- [ ] I have read the [CONTRIBUTING](https://github.com/OpenListTeam/OpenList/blob/main/CONTRIBUTING.md) document.
|
||||
我已阅读 [CONTRIBUTING](https://github.com/OpenListTeam/OpenList/blob/main/CONTRIBUTING.md) 文档。
|
||||
- [ ] I have formatted my code with `go fmt` or [prettier](https://prettier.io/).
|
||||
我已使用 `go fmt` 或 [prettier](https://prettier.io/) 格式化提交的代码。
|
||||
- [ ] I have added appropriate labels to this PR (or mentioned needed labels in the description if lacking permissions).
|
||||
我已为此 PR 添加了适当的标签(如无权限或需要的标签不存在,请在描述中说明,管理员将后续处理)。
|
||||
- [ ] I have requested review from relevant code authors using the "Request review" feature when applicable.
|
||||
我已在适当情况下使用"Request review"功能请求相关代码作者进行审查。
|
||||
- [ ] I have updated the repository accordingly (If it’s needed).
|
||||
我已相应更新了相关仓库(若适用)。
|
||||
- [ ] [OpenList-Frontend](https://github.com/OpenListTeam/OpenList-Frontend) #XXXX
|
||||
- [ ] [OpenList-Docs](https://github.com/OpenListTeam/OpenList-Docs) #XXXX
|
38
.github/workflows/sync_repo.yml
vendored
38
.github/workflows/sync_repo.yml
vendored
@ -1,38 +0,0 @@
|
||||
name: Sync to Gitee
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
sync:
|
||||
runs-on: ubuntu-latest
|
||||
name: Sync GitHub to Gitee
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Setup SSH
|
||||
run: |
|
||||
mkdir -p ~/.ssh
|
||||
echo "${{ secrets.GITEE_SSH_PRIVATE_KEY }}" > ~/.ssh/id_rsa
|
||||
chmod 600 ~/.ssh/id_rsa
|
||||
ssh-keyscan gitee.com >> ~/.ssh/known_hosts
|
||||
|
||||
- name: Create single commit and push
|
||||
run: |
|
||||
git config user.name "GitHub Actions"
|
||||
git config user.email "actions@github.com"
|
||||
|
||||
# Create a new branch
|
||||
git checkout --orphan new-main
|
||||
git add .
|
||||
git commit -m "Sync from GitHub: $(date)"
|
||||
|
||||
# Add Gitee remote and force push
|
||||
git remote add gitee ${{ vars.GITEE_REPO_URL }}
|
||||
git push --force gitee new-main:main
|
110
CONTRIBUTING.md
110
CONTRIBUTING.md
@ -2,76 +2,106 @@
|
||||
|
||||
## Setup your machine
|
||||
|
||||
`OpenList` is written in [Go](https://golang.org/) and [SolidJS](https://www.solidjs.com/).
|
||||
`OpenList` is written in [Go](https://golang.org/) and [React](https://reactjs.org/).
|
||||
|
||||
Prerequisites:
|
||||
|
||||
- [git](https://git-scm.com)
|
||||
- [Go 1.24+](https://golang.org/doc/install)
|
||||
- [Go 1.20+](https://golang.org/doc/install)
|
||||
- [gcc](https://gcc.gnu.org/)
|
||||
- [nodejs](https://nodejs.org/)
|
||||
|
||||
## Cloning a fork
|
||||
|
||||
Fork and clone `OpenList` and `OpenList-Frontend` anywhere:
|
||||
Clone `OpenList` and `OpenList-Frontend` anywhere:
|
||||
|
||||
```shell
|
||||
$ git clone https://github.com/<your-username>/OpenList.git
|
||||
$ git clone --recurse-submodules https://github.com/<your-username>/OpenList-Frontend.git
|
||||
```
|
||||
|
||||
## Creating a branch
|
||||
|
||||
Create a new branch from the `main` branch, with an appropriate name.
|
||||
|
||||
```shell
|
||||
$ git checkout -b <branch-name>
|
||||
$ git clone https://github.com/OpenListTeam/OpenList.git
|
||||
$ git clone --recurse-submodules https://github.com/OpenListTeam/OpenList-Frontend.git
|
||||
```
|
||||
You should switch to the `main` branch for development.
|
||||
|
||||
## Preview your change
|
||||
|
||||
### backend
|
||||
|
||||
```shell
|
||||
$ go run main.go
|
||||
```
|
||||
|
||||
### frontend
|
||||
|
||||
```shell
|
||||
$ pnpm dev
|
||||
```
|
||||
|
||||
## Add a new driver
|
||||
|
||||
Copy `drivers/template` folder and rename it, and follow the comments in it.
|
||||
|
||||
## Create a commit
|
||||
|
||||
Commit messages should be well formatted, and to make that "standardized".
|
||||
|
||||
Submit your pull request. For PR titles, follow [Conventional Commits](https://www.conventionalcommits.org).
|
||||
### Commit Message Format
|
||||
Each commit message consists of a **header**, a **body** and a **footer**. The header has a special
|
||||
format that includes a **type**, a **scope** and a **subject**:
|
||||
|
||||
https://github.com/OpenListTeam/OpenList/issues/376
|
||||
```
|
||||
<type>(<scope>): <subject>
|
||||
<BLANK LINE>
|
||||
<body>
|
||||
<BLANK LINE>
|
||||
<footer>
|
||||
```
|
||||
|
||||
It's suggested to sign your commits. See: [How to sign commits](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits)
|
||||
The **header** is mandatory and the **scope** of the header is optional.
|
||||
|
||||
Any line of the commit message cannot be longer than 100 characters! This allows the message to be easier
|
||||
to read on GitHub as well as in various git tools.
|
||||
|
||||
### Revert
|
||||
If the commit reverts a previous commit, it should begin with `revert: `, followed by the header
|
||||
of the reverted commit.
|
||||
In the body it should say: `This reverts commit <hash>.`, where the hash is the SHA of the commit
|
||||
being reverted.
|
||||
|
||||
### Type
|
||||
Must be one of the following:
|
||||
|
||||
* **feat**: A new feature
|
||||
* **fix**: A bug fix
|
||||
* **docs**: Documentation only changes
|
||||
* **style**: Changes that do not affect the meaning of the code (white-space, formatting, missing
|
||||
semi-colons, etc)
|
||||
* **refactor**: A code change that neither fixes a bug nor adds a feature
|
||||
* **perf**: A code change that improves performance
|
||||
* **test**: Adding missing or correcting existing tests
|
||||
* **build**: Affects project builds or dependency modifications
|
||||
* **revert**: Restore the previous commit
|
||||
* **ci**: Continuous integration of related file modifications
|
||||
* **chore**: Changes to the build process or auxiliary tools and libraries such as documentation
|
||||
generation
|
||||
* **release**: Release a new version
|
||||
|
||||
### Scope
|
||||
The scope could be anything specifying place of the commit change. For example `$location`,
|
||||
`$browser`, `$compile`, `$rootScope`, `ngHref`, `ngClick`, `ngView`, etc...
|
||||
|
||||
You can use `*` when the change affects more than a single scope.
|
||||
|
||||
### Subject
|
||||
The subject contains succinct description of the change:
|
||||
|
||||
* use the imperative, present tense: "change" not "changed" nor "changes"
|
||||
* don't capitalize first letter
|
||||
* no dot (.) at the end
|
||||
|
||||
### Body
|
||||
Just as in the **subject**, use the imperative, present tense: "change" not "changed" nor "changes".
|
||||
The body should include the motivation for the change and contrast this with previous behavior.
|
||||
|
||||
### Footer
|
||||
The footer should contain any information about **Breaking Changes** and is also the place to
|
||||
[reference GitHub issues that this commit closes](https://help.github.com/articles/closing-issues-via-commit-messages/).
|
||||
|
||||
**Breaking Changes** should start with the word `BREAKING CHANGE:` with a space or two newlines.
|
||||
The rest of the commit message is then used for this.
|
||||
|
||||
## Submit a pull request
|
||||
|
||||
Please make sure your code has been formatted with `go fmt` or [prettier](https://prettier.io/) before submitting.
|
||||
|
||||
Push your branch to your `openlist` fork and open a pull request against the `main` branch.
|
||||
|
||||
## Merge your pull request
|
||||
|
||||
Your pull request will be merged after review. Please wait for the maintainer to merge your pull request after review.
|
||||
|
||||
At least 1 approving review is required by reviewers with write access. You can also request a review from maintainers.
|
||||
|
||||
## Delete your branch
|
||||
|
||||
(Optional) After your pull request is merged, you can delete your branch.
|
||||
|
||||
---
|
||||
|
||||
Thank you for your contribution! Let's make OpenList better together!
|
||||
Push your branch to your `openlist` fork and open a pull request against the
|
||||
`main` branch.
|
||||
|
23
Dockerfile
23
Dockerfile
@ -1,6 +1,3 @@
|
||||
### Default image is base. You can add other support by modifying BASE_IMAGE_TAG. The following parameters are supported: base (default), aria2, ffmpeg, aio
|
||||
ARG BASE_IMAGE_TAG=base
|
||||
|
||||
FROM alpine:edge AS builder
|
||||
LABEL stage=go-builder
|
||||
WORKDIR /app/
|
||||
@ -10,27 +7,21 @@ RUN go mod download
|
||||
COPY ./ ./
|
||||
RUN bash build.sh release docker
|
||||
|
||||
### Default image is base. You can add other support by modifying BASE_IMAGE_TAG. The following parameters are supported: base (default), aria2, ffmpeg, aio
|
||||
ARG BASE_IMAGE_TAG=base
|
||||
FROM openlistteam/openlist-base-image:${BASE_IMAGE_TAG}
|
||||
LABEL MAINTAINER="OpenList"
|
||||
|
||||
ARG INSTALL_FFMPEG=false
|
||||
ARG INSTALL_ARIA2=false
|
||||
ARG USER=openlist
|
||||
ARG UID=1001
|
||||
ARG GID=1001
|
||||
LABEL MAINTAINER="OpenList"
|
||||
|
||||
WORKDIR /opt/openlist/
|
||||
|
||||
RUN addgroup -g ${GID} ${USER} && \
|
||||
adduser -D -u ${UID} -G ${USER} ${USER} && \
|
||||
mkdir -p /opt/openlist/data
|
||||
|
||||
COPY --from=builder --chmod=755 --chown=${UID}:${GID} /app/bin/openlist ./
|
||||
COPY --chmod=755 --chown=${UID}:${GID} entrypoint.sh /entrypoint.sh
|
||||
|
||||
USER ${USER}
|
||||
COPY --chmod=755 --from=builder /app/bin/openlist ./
|
||||
COPY --chmod=755 entrypoint.sh /entrypoint.sh
|
||||
RUN /entrypoint.sh version
|
||||
|
||||
ENV UMASK=022 RUN_ARIA2=${INSTALL_ARIA2}
|
||||
ENV PUID=0 PGID=0 UMASK=022 RUN_ARIA2=${INSTALL_ARIA2}
|
||||
VOLUME /opt/openlist/data/
|
||||
EXPOSE 5244 5245
|
||||
CMD [ "/entrypoint.sh" ]
|
||||
|
@ -1,26 +1,18 @@
|
||||
ARG BASE_IMAGE_TAG=base
|
||||
FROM ghcr.io/openlistteam/openlist-base-image:${BASE_IMAGE_TAG}
|
||||
LABEL MAINTAINER="OpenList"
|
||||
|
||||
ARG TARGETPLATFORM
|
||||
ARG INSTALL_FFMPEG=false
|
||||
ARG INSTALL_ARIA2=false
|
||||
ARG USER=openlist
|
||||
ARG UID=1001
|
||||
ARG GID=1001
|
||||
LABEL MAINTAINER="OpenList"
|
||||
|
||||
WORKDIR /opt/openlist/
|
||||
|
||||
RUN addgroup -g ${GID} ${USER} && \
|
||||
adduser -D -u ${UID} -G ${USER} ${USER} && \
|
||||
mkdir -p /opt/openlist/data
|
||||
|
||||
COPY --chmod=755 --chown=${UID}:${GID} /build/${TARGETPLATFORM}/openlist ./
|
||||
COPY --chmod=755 --chown=${UID}:${GID} entrypoint.sh /entrypoint.sh
|
||||
|
||||
USER ${USER}
|
||||
COPY --chmod=755 /build/${TARGETPLATFORM}/openlist ./
|
||||
COPY --chmod=755 entrypoint.sh /entrypoint.sh
|
||||
RUN /entrypoint.sh version
|
||||
|
||||
ENV UMASK=022 RUN_ARIA2=${INSTALL_ARIA2}
|
||||
ENV PUID=0 PGID=0 UMASK=022 RUN_ARIA2=${INSTALL_ARIA2}
|
||||
VOLUME /opt/openlist/data/
|
||||
EXPOSE 5244 5245
|
||||
CMD [ "/entrypoint.sh" ]
|
@ -6,9 +6,10 @@ services:
|
||||
ports:
|
||||
- '5244:5244'
|
||||
- '5245:5245'
|
||||
user: '0:0'
|
||||
environment:
|
||||
- PUID=0
|
||||
- PGID=0
|
||||
- UMASK=022
|
||||
- TZ=Asia/Shanghai
|
||||
- TZ=UTC
|
||||
container_name: openlist
|
||||
image: 'openlistteam/openlist:latest'
|
||||
|
@ -1,60 +1,43 @@
|
||||
package _115
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
driver115 "github.com/SheltonZhu/115driver/pkg/driver"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
md5Salt = "Qclm8MGWUv59TnrR0XPg"
|
||||
appVer = "35.6.0.3"
|
||||
appVer = "27.0.5.7"
|
||||
)
|
||||
|
||||
func (d *Pan115) getAppVersion() (string, error) {
|
||||
result := VersionResp{}
|
||||
res, err := base.RestyClient.R().Get(driver115.ApiGetVersion)
|
||||
func (d *Pan115) getAppVersion() ([]driver115.AppVersion, error) {
|
||||
result := driver115.VersionResp{}
|
||||
resp, err := base.RestyClient.R().Get(driver115.ApiGetVersion)
|
||||
|
||||
err = driver115.CheckErr(err, &result, resp)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
err = utils.Json.Unmarshal(res.Body(), &result)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(result.Error) > 0 {
|
||||
return "", errors.New(result.Error)
|
||||
}
|
||||
return result.Data.Win.Version, nil
|
||||
|
||||
return result.Data.GetAppVersions(), nil
|
||||
}
|
||||
|
||||
func (d *Pan115) getAppVer() string {
|
||||
ver, err := d.getAppVersion()
|
||||
// todo add some cache?
|
||||
vers, err := d.getAppVersion()
|
||||
if err != nil {
|
||||
log.Warnf("[115] get app version failed: %v", err)
|
||||
return appVer
|
||||
}
|
||||
if len(ver) > 0 {
|
||||
return ver
|
||||
for _, ver := range vers {
|
||||
if ver.AppName == "win" {
|
||||
return ver.Version
|
||||
}
|
||||
}
|
||||
return appVer
|
||||
}
|
||||
|
||||
func (d *Pan115) initAppVer() {
|
||||
appVer = d.getAppVer()
|
||||
log.Debugf("use app version: %v", appVer)
|
||||
}
|
||||
|
||||
type VersionResp struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
Data Versions `json:"data"`
|
||||
}
|
||||
|
||||
type Versions struct {
|
||||
Win Version `json:"win"`
|
||||
}
|
||||
|
||||
type Version struct {
|
||||
Version string `json:"version_code"`
|
||||
}
|
||||
|
@ -186,7 +186,9 @@ func (d *Pan115) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
|
||||
preHash = strings.ToUpper(preHash)
|
||||
fullHash := stream.GetHash().GetHash(utils.SHA1)
|
||||
if len(fullHash) != utils.SHA1.Width {
|
||||
_, fullHash, err = streamPkg.CacheFullAndHash(stream, &up, utils.SHA1)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -321,7 +321,7 @@ func (d *Pan115) UploadByMultipart(ctx context.Context, params *driver115.Upload
|
||||
err error
|
||||
)
|
||||
|
||||
tmpF, err := s.CacheFullAndWriter(&up, nil)
|
||||
tmpF, err := s.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -239,7 +239,9 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
|
||||
}
|
||||
sha1 := file.GetHash().GetHash(utils.SHA1)
|
||||
if len(sha1) != utils.SHA1.Width {
|
||||
_, sha1, err = stream.CacheFullAndHash(file, &up, utils.SHA1)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, sha1, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.SHA1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -86,14 +86,13 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
|
||||
|
||||
fileSize := stream.GetSize()
|
||||
chunkSize := calPartSize(fileSize)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(chunkSize), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partNum := (stream.GetSize() + chunkSize - 1) / chunkSize
|
||||
parts := make([]oss.UploadPart, partNum)
|
||||
offset := int64(0)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(chunkSize))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := int64(1); i <= partNum; i++ {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -120,7 +119,7 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second))
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -182,7 +182,9 @@ func (d *Pan123) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea
|
||||
etag := file.GetHash().GetHash(utils.MD5)
|
||||
var err error
|
||||
if len(etag) < utils.MD5.Width {
|
||||
_, etag, err = stream.CacheFullAndHash(file, &up, utils.MD5)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, etag, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -81,12 +81,6 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
|
||||
if size > chunkSize {
|
||||
chunkCount = int((size + chunkSize - 1) / chunkSize)
|
||||
}
|
||||
|
||||
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lastChunkSize := size % chunkSize
|
||||
if lastChunkSize == 0 {
|
||||
lastChunkSize = chunkSize
|
||||
@ -98,6 +92,10 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
|
||||
batchSize = 10
|
||||
getS3UploadUrl = d.getS3PreSignedUrls
|
||||
}
|
||||
ss, err := stream.NewStreamSectionReader(file, int(chunkSize))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
thread := min(int(chunkCount), d.UploadThread)
|
||||
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
|
||||
@ -182,7 +180,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
|
||||
return nil
|
||||
},
|
||||
After: func(err error) {
|
||||
ss.FreeSectionReader(reader)
|
||||
ss.RecycleSectionReader(reader)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
@ -69,45 +69,13 @@ func (d *Open123) List(ctx context.Context, dir model.Obj, args model.ListArgs)
|
||||
func (d *Open123) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
|
||||
fileId, _ := strconv.ParseInt(file.GetID(), 10, 64)
|
||||
|
||||
if d.DirectLink {
|
||||
res, err := d.getDirectLink(fileId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if d.DirectLinkPrivateKey == "" {
|
||||
duration := 365 * 24 * time.Hour // 缓存1年
|
||||
return &model.Link{
|
||||
URL: res.Data.URL,
|
||||
Expiration: &duration,
|
||||
}, nil
|
||||
}
|
||||
|
||||
u, err := d.getUserInfo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
duration := time.Duration(d.DirectLinkValidDuration) * time.Minute
|
||||
|
||||
newURL, err := d.SignURL(res.Data.URL, d.DirectLinkPrivateKey,
|
||||
u.Data.UID, duration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &model.Link{
|
||||
URL: newURL,
|
||||
Expiration: &duration,
|
||||
}, nil
|
||||
}
|
||||
|
||||
res, err := d.getDownloadInfo(fileId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &model.Link{URL: res.Data.DownloadUrl}, nil
|
||||
link := model.Link{URL: res.Data.DownloadUrl}
|
||||
return &link, nil
|
||||
}
|
||||
|
||||
func (d *Open123) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error {
|
||||
@ -164,7 +132,9 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
|
||||
// etag 文件md5
|
||||
etag := file.GetHash().GetHash(utils.MD5)
|
||||
if len(etag) < utils.MD5.Width {
|
||||
_, etag, err = stream.CacheFullAndHash(file, &up, utils.MD5)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, etag, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -23,11 +23,6 @@ type Addition struct {
|
||||
// 上传线程数
|
||||
UploadThread int `json:"UploadThread" type:"number" default:"3" help:"the threads of upload"`
|
||||
|
||||
// 使用直链
|
||||
DirectLink bool `json:"DirectLink" type:"bool" default:"false" required:"false" help:"use direct link when download file"`
|
||||
DirectLinkPrivateKey string `json:"DirectLinkPrivateKey" required:"false" help:"private key for direct link, if URL authentication is enabled"`
|
||||
DirectLinkValidDuration int64 `json:"DirectLinkValidDuration" type:"number" default:"30" required:"false" help:"minutes, if URL authentication is enabled"`
|
||||
|
||||
driver.RootID
|
||||
}
|
||||
|
||||
|
@ -127,7 +127,7 @@ type RefreshTokenResp struct {
|
||||
type UserInfoResp struct {
|
||||
BaseResp
|
||||
Data struct {
|
||||
UID uint64 `json:"uid"`
|
||||
UID int64 `json:"uid"`
|
||||
Username string `json:"username"`
|
||||
DisplayName string `json:"displayName"`
|
||||
HeadImage string `json:"headImage"`
|
||||
@ -158,13 +158,6 @@ type DownloadInfoResp struct {
|
||||
} `json:"data"`
|
||||
}
|
||||
|
||||
type DirectLinkResp struct {
|
||||
BaseResp
|
||||
Data struct {
|
||||
URL string `json:"url"`
|
||||
} `json:"data"`
|
||||
}
|
||||
|
||||
// 创建文件V2返回
|
||||
type UploadCreateResp struct {
|
||||
BaseResp
|
||||
|
@ -46,12 +46,6 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
||||
uploadDomain := createResp.Data.Servers[0]
|
||||
size := file.GetSize()
|
||||
chunkSize := createResp.Data.SliceSize
|
||||
|
||||
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadNums := (size + chunkSize - 1) / chunkSize
|
||||
thread := min(int(uploadNums), d.UploadThread)
|
||||
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
|
||||
@ -59,6 +53,10 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
||||
retry.Delay(time.Second),
|
||||
retry.DelayType(retry.BackOffDelay))
|
||||
|
||||
ss, err := stream.NewStreamSectionReader(file, int(chunkSize))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for partIndex := range uploadNums {
|
||||
if utils.IsCanceled(uploadCtx) {
|
||||
break
|
||||
@ -70,8 +68,6 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
||||
var reader *stream.SectionReader
|
||||
var rateLimitedRd io.Reader
|
||||
sliceMD5 := ""
|
||||
// 表单
|
||||
b := bytes.NewBuffer(make([]byte, 0, 2048))
|
||||
threadG.GoWithLifecycle(errgroup.Lifecycle{
|
||||
Before: func(ctx context.Context) error {
|
||||
if reader == nil {
|
||||
@ -86,6 +82,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
@ -93,8 +90,9 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
||||
// 重置分片reader位置,因为HashReader、上一次失败已经读取到分片EOF
|
||||
reader.Seek(0, io.SeekStart)
|
||||
|
||||
b.Reset()
|
||||
w := multipart.NewWriter(b)
|
||||
// 创建表单数据
|
||||
var b bytes.Buffer
|
||||
w := multipart.NewWriter(&b)
|
||||
// 添加表单字段
|
||||
err = w.WriteField("preuploadID", createResp.Data.PreuploadID)
|
||||
if err != nil {
|
||||
@ -109,20 +107,21 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
||||
return err
|
||||
}
|
||||
// 写入文件内容
|
||||
_, err = w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber))
|
||||
fw, err := w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = utils.CopyWithBuffer(fw, rateLimitedRd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
headSize := b.Len()
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
head := bytes.NewReader(b.Bytes()[:headSize])
|
||||
tail := bytes.NewReader(b.Bytes()[headSize:])
|
||||
rateLimitedRd = driver.NewLimitedUploadStream(ctx, io.MultiReader(head, reader, tail))
|
||||
|
||||
// 创建请求并设置header
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", rateLimitedRd)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", &b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -158,7 +157,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
||||
return nil
|
||||
},
|
||||
After: func(err error) {
|
||||
ss.FreeSectionReader(reader)
|
||||
ss.RecycleSectionReader(reader)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
@ -1,20 +1,15 @@
|
||||
package _123_open
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -25,8 +20,7 @@ var ( //不同情况下获取的AccessTokenQPS限制不同 如下模块化易于
|
||||
RefreshToken = InitApiInfo(Api+"/api/v1/oauth2/access_token", 1)
|
||||
UserInfo = InitApiInfo(Api+"/api/v1/user/info", 1)
|
||||
FileList = InitApiInfo(Api+"/api/v2/file/list", 3)
|
||||
DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 5)
|
||||
DirectLink = InitApiInfo(Api+"/api/v1/direct-link/url", 5)
|
||||
DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 0)
|
||||
Mkdir = InitApiInfo(Api+"/upload/v1/file/mkdir", 2)
|
||||
Move = InitApiInfo(Api+"/api/v1/file/move", 1)
|
||||
Rename = InitApiInfo(Api+"/api/v1/file/name", 1)
|
||||
@ -86,24 +80,8 @@ func (d *Open123) Request(apiInfo *ApiInfo, method string, callback base.ReqCall
|
||||
}
|
||||
|
||||
func (d *Open123) flushAccessToken() error {
|
||||
if d.ClientID != "" {
|
||||
if d.RefreshToken != "" {
|
||||
var resp RefreshTokenResp
|
||||
_, err := d.Request(RefreshToken, http.MethodPost, func(req *resty.Request) {
|
||||
req.SetQueryParam("client_id", d.ClientID)
|
||||
if d.ClientSecret != "" {
|
||||
req.SetQueryParam("client_secret", d.ClientSecret)
|
||||
}
|
||||
req.SetQueryParam("grant_type", "refresh_token")
|
||||
req.SetQueryParam("refresh_token", d.RefreshToken)
|
||||
}, &resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.AccessToken = resp.AccessToken
|
||||
d.RefreshToken = resp.RefreshToken
|
||||
op.MustSaveDriverStorage(d)
|
||||
} else if d.ClientSecret != "" {
|
||||
if d.Addition.ClientID != "" {
|
||||
if d.Addition.ClientSecret != "" {
|
||||
var resp AccessTokenResp
|
||||
_, err := d.Request(AccessToken, http.MethodPost, func(req *resty.Request) {
|
||||
req.SetBody(base.Json{
|
||||
@ -116,38 +94,24 @@ func (d *Open123) flushAccessToken() error {
|
||||
}
|
||||
d.AccessToken = resp.Data.AccessToken
|
||||
op.MustSaveDriverStorage(d)
|
||||
} else if d.Addition.RefreshToken != "" {
|
||||
var resp RefreshTokenResp
|
||||
_, err := d.Request(RefreshToken, http.MethodPost, func(req *resty.Request) {
|
||||
req.SetQueryParam("client_id", d.ClientID)
|
||||
req.SetQueryParam("grant_type", "refresh_token")
|
||||
req.SetQueryParam("refresh_token", d.Addition.RefreshToken)
|
||||
}, &resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.AccessToken = resp.AccessToken
|
||||
d.RefreshToken = resp.RefreshToken
|
||||
op.MustSaveDriverStorage(d)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Open123) SignURL(originURL, privateKey string, uid uint64, validDuration time.Duration) (newURL string, err error) {
|
||||
// 生成Unix时间戳
|
||||
ts := time.Now().Add(validDuration).Unix()
|
||||
|
||||
// 生成随机数(建议使用UUID,不能包含中划线(-))
|
||||
rand := strings.ReplaceAll(uuid.New().String(), "-", "")
|
||||
|
||||
// 解析URL
|
||||
objURL, err := url.Parse(originURL)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// 待签名字符串,格式:path-timestamp-rand-uid-privateKey
|
||||
unsignedStr := fmt.Sprintf("%s-%d-%s-%d-%s", objURL.Path, ts, rand, uid, privateKey)
|
||||
md5Hash := md5.Sum([]byte(unsignedStr))
|
||||
// 生成鉴权参数,格式:timestamp-rand-uid-md5hash
|
||||
authKey := fmt.Sprintf("%d-%s-%d-%x", ts, rand, uid, md5Hash)
|
||||
|
||||
// 添加鉴权参数到URL查询参数
|
||||
v := objURL.Query()
|
||||
v.Add("auth_key", authKey)
|
||||
objURL.RawQuery = v.Encode()
|
||||
|
||||
return objURL.String(), nil
|
||||
}
|
||||
|
||||
func (d *Open123) getUserInfo() (*UserInfoResp, error) {
|
||||
var resp UserInfoResp
|
||||
|
||||
@ -195,21 +159,6 @@ func (d *Open123) getDownloadInfo(fileId int64) (*DownloadInfoResp, error) {
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (d *Open123) getDirectLink(fileId int64) (*DirectLinkResp, error) {
|
||||
var resp DirectLinkResp
|
||||
|
||||
_, err := d.Request(DirectLink, http.MethodGet, func(req *resty.Request) {
|
||||
req.SetQueryParams(map[string]string{
|
||||
"fileId": strconv.FormatInt(fileId, 10),
|
||||
})
|
||||
}, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (d *Open123) mkdir(parentID int64, name string) error {
|
||||
_, err := d.Request(Mkdir, http.MethodPost, func(req *resty.Request) {
|
||||
req.SetBody(base.Json{
|
||||
|
@ -522,7 +522,9 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
|
||||
var err error
|
||||
fullHash := stream.GetHash().GetHash(utils.SHA256)
|
||||
if len(fullHash) != utils.SHA256.Width {
|
||||
_, fullHash, err = streamPkg.CacheFullAndHash(stream, &up, utils.SHA256)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA256)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -534,15 +536,16 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
|
||||
if size > partSize {
|
||||
part = (size + partSize - 1) / partSize
|
||||
}
|
||||
|
||||
// 生成所有 partInfos
|
||||
partInfos := make([]PartInfo, 0, part)
|
||||
for i := int64(0); i < part; i++ {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
start := i * partSize
|
||||
byteSize := min(size-start, partSize)
|
||||
byteSize := size - start
|
||||
if byteSize > partSize {
|
||||
byteSize = partSize
|
||||
}
|
||||
partNumber := i + 1
|
||||
partInfo := PartInfo{
|
||||
PartNumber: partNumber,
|
||||
@ -590,20 +593,17 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
|
||||
// resp.Data.RapidUpload: true 支持快传,但此处直接检测是否返回分片的上传地址
|
||||
// 快传的情况下同样需要手动处理冲突
|
||||
if resp.Data.PartInfos != nil {
|
||||
// Progress
|
||||
p := driver.NewProgress(size, up)
|
||||
rateLimited := driver.NewLimitedUploadStream(ctx, stream)
|
||||
// 读取前100个分片的上传地址
|
||||
uploadPartInfos := resp.Data.PartInfos
|
||||
|
||||
// 先上传前100个分片
|
||||
err = d.uploadPersonalParts(ctx, partInfos, resp.Data.PartInfos, rateLimited, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 如果还有剩余分片,分批获取上传地址并上传
|
||||
for i := 100; i < len(partInfos); i += 100 {
|
||||
end := min(i+100, len(partInfos))
|
||||
// 获取后续分片的上传地址
|
||||
for i := 101; i < len(partInfos); i += 100 {
|
||||
end := i + 100
|
||||
if end > len(partInfos) {
|
||||
end = len(partInfos)
|
||||
}
|
||||
batchPartInfos := partInfos[i:end]
|
||||
|
||||
moredata := base.Json{
|
||||
"fileId": resp.Data.FileId,
|
||||
"uploadId": resp.Data.UploadId,
|
||||
@ -619,13 +619,44 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = d.uploadPersonalParts(ctx, partInfos, moreresp.Data.PartInfos, rateLimited, p)
|
||||
uploadPartInfos = append(uploadPartInfos, moreresp.Data.PartInfos...)
|
||||
}
|
||||
|
||||
// Progress
|
||||
p := driver.NewProgress(size, up)
|
||||
|
||||
rateLimited := driver.NewLimitedUploadStream(ctx, stream)
|
||||
// 上传所有分片
|
||||
for _, uploadPartInfo := range uploadPartInfos {
|
||||
index := uploadPartInfo.PartNumber - 1
|
||||
partSize := partInfos[index].PartSize
|
||||
log.Debugf("[139] uploading part %+v/%+v", index, len(uploadPartInfos))
|
||||
limitReader := io.LimitReader(rateLimited, partSize)
|
||||
|
||||
// Update Progress
|
||||
r := io.TeeReader(limitReader, p)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadPartInfo.UploadUrl, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
req.Header.Set("Content-Length", fmt.Sprint(partSize))
|
||||
req.Header.Set("Origin", "https://yun.139.com")
|
||||
req.Header.Set("Referer", "https://yun.139.com/")
|
||||
req.ContentLength = partSize
|
||||
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = res.Body.Close()
|
||||
log.Debugf("[139] uploaded: %+v", res)
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("unexpected status code: %d", res.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// 全部分片上传完毕后,complete
|
||||
data = base.Json{
|
||||
"contentHash": fullHash,
|
||||
"contentHashAlgorithm": "SHA256",
|
||||
|
@ -1,11 +1,9 @@
|
||||
package _139
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
@ -15,7 +13,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
@ -626,47 +623,3 @@ func (d *Yun139) getPersonalCloudHost() string {
|
||||
}
|
||||
return d.PersonalCloudHost
|
||||
}
|
||||
|
||||
func (d *Yun139) uploadPersonalParts(ctx context.Context, partInfos []PartInfo, uploadPartInfos []PersonalPartInfo, rateLimited *driver.RateLimitReader, p *driver.Progress) error {
|
||||
// 确保数组以 PartNumber 从小到大排序
|
||||
sort.Slice(uploadPartInfos, func(i, j int) bool {
|
||||
return uploadPartInfos[i].PartNumber < uploadPartInfos[j].PartNumber
|
||||
})
|
||||
|
||||
for _, uploadPartInfo := range uploadPartInfos {
|
||||
index := uploadPartInfo.PartNumber - 1
|
||||
if index < 0 || index >= len(partInfos) {
|
||||
return fmt.Errorf("invalid PartNumber %d: index out of bounds (partInfos length: %d)", uploadPartInfo.PartNumber, len(partInfos))
|
||||
}
|
||||
partSize := partInfos[index].PartSize
|
||||
log.Debugf("[139] uploading part %+v/%+v", index, len(partInfos))
|
||||
limitReader := io.LimitReader(rateLimited, partSize)
|
||||
r := io.TeeReader(limitReader, p)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadPartInfo.UploadUrl, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
req.Header.Set("Content-Length", fmt.Sprint(partSize))
|
||||
req.Header.Set("Origin", "https://yun.139.com")
|
||||
req.Header.Set("Referer", "https://yun.139.com/")
|
||||
req.ContentLength = partSize
|
||||
err = func() error {
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
log.Debugf("[139] uploaded: %+v", res)
|
||||
if res.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(res.Body)
|
||||
return fmt.Errorf("unexpected status code: %d, body: %s", res.StatusCode, string(body))
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -5,19 +5,17 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"github.com/skip2/go-qrcode"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/skip2/go-qrcode"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
|
||||
"github.com/go-resty/resty/v2"
|
||||
@ -131,7 +129,6 @@ func (y *Cloud189TV) put(ctx context.Context, url string, headers map[string]str
|
||||
}
|
||||
}
|
||||
|
||||
// 请求完成后http.Client会Close Request.Body
|
||||
resp, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -314,14 +311,11 @@ func (y *Cloud189TV) RapidUpload(ctx context.Context, dstDir model.Obj, stream m
|
||||
|
||||
// 旧版本上传,家庭云不支持覆盖
|
||||
func (y *Cloud189TV) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) {
|
||||
fileMd5 := file.GetHash().GetHash(utils.MD5)
|
||||
var tempFile = file.GetFile()
|
||||
var err error
|
||||
if len(fileMd5) != utils.MD5.Width {
|
||||
tempFile, fileMd5, err = stream.CacheFullAndHash(file, &up, utils.MD5)
|
||||
} else if tempFile == nil {
|
||||
tempFile, err = file.CacheFullAndWriter(&up, nil)
|
||||
tempFile, err := file.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fileMd5, err := utils.HashFile(utils.MD5, tempFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -334,10 +328,6 @@ func (y *Cloud189TV) OldUpload(ctx context.Context, dstDir model.Obj, file model
|
||||
|
||||
// 网盘中不存在该文件,开始上传
|
||||
status := GetUploadFileStatusResp{CreateUploadFileResp: *uploadInfo}
|
||||
// driver.RateLimitReader会尝试Close底层的reader
|
||||
// 但这里的tempFile是一个*os.File,Close后就没法继续读了
|
||||
// 所以这里用io.NopCloser包一层
|
||||
rateLimitedRd := driver.NewLimitedUploadStream(ctx, io.NopCloser(tempFile))
|
||||
for status.GetSize() < file.GetSize() && status.FileDataExists != 1 {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
@ -355,7 +345,7 @@ func (y *Cloud189TV) OldUpload(ctx context.Context, dstDir model.Obj, file model
|
||||
header["Edrive-UploadFileId"] = fmt.Sprint(status.UploadFileId)
|
||||
}
|
||||
|
||||
_, err := y.put(ctx, status.FileUploadUrl, header, true, rateLimitedRd, isFamily)
|
||||
_, err := y.put(ctx, status.FileUploadUrl, header, true, io.NopCloser(tempFile), isFamily)
|
||||
if err, ok := err.(*RespErr); ok && err.Code != "InputStreamReadError" {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -472,16 +472,14 @@ func (y *Cloud189PC) refreshSession() (err error) {
|
||||
// 普通上传
|
||||
// 无法上传大小为0的文件
|
||||
func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) {
|
||||
// 文件大小
|
||||
fileSize := file.GetSize()
|
||||
// 分片大小,不得为文件大小
|
||||
sliceSize := partSize(fileSize)
|
||||
size := file.GetSize()
|
||||
sliceSize := min(size, partSize(size))
|
||||
|
||||
params := Params{
|
||||
"parentFolderId": dstDir.GetID(),
|
||||
"fileName": url.QueryEscape(file.GetName()),
|
||||
"fileSize": fmt.Sprint(fileSize),
|
||||
"sliceSize": fmt.Sprint(sliceSize), // 必须为特定分片大小
|
||||
"fileSize": fmt.Sprint(file.GetSize()),
|
||||
"sliceSize": fmt.Sprint(sliceSize),
|
||||
"lazyCheck": "1",
|
||||
}
|
||||
|
||||
@ -502,8 +500,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ss, err := stream.NewStreamSectionReader(file, int(sliceSize), &up)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(sliceSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -514,10 +511,10 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
|
||||
retry.DelayType(retry.BackOffDelay))
|
||||
|
||||
count := 1
|
||||
if fileSize > sliceSize {
|
||||
count = int((fileSize + sliceSize - 1) / sliceSize)
|
||||
if size > sliceSize {
|
||||
count = int((size + sliceSize - 1) / sliceSize)
|
||||
}
|
||||
lastPartSize := fileSize % sliceSize
|
||||
lastPartSize := size % sliceSize
|
||||
if lastPartSize == 0 {
|
||||
lastPartSize = sliceSize
|
||||
}
|
||||
@ -537,9 +534,9 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
|
||||
break
|
||||
}
|
||||
offset := int64((i)-1) * sliceSize
|
||||
partSize := sliceSize
|
||||
size := sliceSize
|
||||
if i == count {
|
||||
partSize = lastPartSize
|
||||
size = lastPartSize
|
||||
}
|
||||
partInfo := ""
|
||||
var reader *stream.SectionReader
|
||||
@ -548,14 +545,14 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
|
||||
Before: func(ctx context.Context) error {
|
||||
if reader == nil {
|
||||
var err error
|
||||
reader, err = ss.GetSectionReader(offset, partSize)
|
||||
reader, err = ss.GetSectionReader(offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
silceMd5.Reset()
|
||||
w, err := utils.CopyWithBuffer(writers, reader)
|
||||
if w != partSize {
|
||||
return fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", partSize, w, err)
|
||||
if w != size {
|
||||
return fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", size, w, err)
|
||||
}
|
||||
// 计算块md5并进行hex和base64编码
|
||||
md5Bytes := silceMd5.Sum(nil)
|
||||
@ -584,7 +581,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
|
||||
return nil
|
||||
},
|
||||
After: func(err error) {
|
||||
ss.FreeSectionReader(reader)
|
||||
ss.RecycleSectionReader(reader)
|
||||
},
|
||||
},
|
||||
)
|
||||
@ -597,7 +594,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
|
||||
fileMd5Hex = strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil)))
|
||||
}
|
||||
sliceMd5Hex := fileMd5Hex
|
||||
if fileSize > sliceSize {
|
||||
if file.GetSize() > sliceSize {
|
||||
sliceMd5Hex = strings.ToUpper(utils.GetMD5EncodeStr(strings.Join(silceMd5Hexs, "\n")))
|
||||
}
|
||||
|
||||
@ -860,7 +857,9 @@ func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, isFamily bool, uplo
|
||||
|
||||
// 旧版本上传,家庭云不支持覆盖
|
||||
func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) {
|
||||
tempFile, fileMd5, err := stream.CacheFullAndHash(file, &up, utils.MD5)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
tempFile, fileMd5, err := stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
stdpath "path"
|
||||
"strings"
|
||||
|
||||
@ -13,7 +12,6 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/fs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/sign"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
@ -162,18 +160,25 @@ func (d *Alias) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
|
||||
sign.Sign(reqPath)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
resultLink := *link
|
||||
resultLink.SyncClosers = utils.NewSyncClosers(link)
|
||||
if args.Redirect {
|
||||
return &resultLink, nil
|
||||
return link, nil
|
||||
}
|
||||
|
||||
resultLink := &model.Link{
|
||||
URL: link.URL,
|
||||
Header: link.Header,
|
||||
RangeReader: link.RangeReader,
|
||||
MFile: link.MFile,
|
||||
Concurrency: link.Concurrency,
|
||||
PartSize: link.PartSize,
|
||||
ContentLength: link.ContentLength,
|
||||
SyncClosers: utils.NewSyncClosers(link),
|
||||
}
|
||||
if resultLink.ContentLength == 0 {
|
||||
resultLink.ContentLength = fi.GetSize()
|
||||
}
|
||||
if resultLink.MFile != nil {
|
||||
return &resultLink, nil
|
||||
return resultLink, nil
|
||||
}
|
||||
if d.DownloadConcurrency > 0 {
|
||||
resultLink.Concurrency = d.DownloadConcurrency
|
||||
@ -181,7 +186,7 @@ func (d *Alias) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
|
||||
if d.DownloadPartSize > 0 {
|
||||
resultLink.PartSize = d.DownloadPartSize * utils.KB
|
||||
}
|
||||
return &resultLink, nil
|
||||
return resultLink, nil
|
||||
}
|
||||
return nil, errs.ObjectNotFound
|
||||
}
|
||||
@ -308,29 +313,24 @@ func (d *Alias) Put(ctx context.Context, dstDir model.Obj, s model.FileStreamer,
|
||||
reqPath, err := d.getReqPath(ctx, dstDir, true)
|
||||
if err == nil {
|
||||
if len(reqPath) == 1 {
|
||||
storage, reqActualPath, err := op.GetStorageAndActualPath(*reqPath[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return op.Put(ctx, storage, reqActualPath, &stream.FileStream{
|
||||
Obj: s,
|
||||
Mimetype: s.GetMimetype(),
|
||||
Reader: s,
|
||||
}, up)
|
||||
return fs.PutDirectly(ctx, *reqPath[0], &stream.FileStream{
|
||||
Obj: s,
|
||||
Mimetype: s.GetMimetype(),
|
||||
WebPutAsTask: s.NeedStore(),
|
||||
Reader: s,
|
||||
})
|
||||
} else {
|
||||
file, err := s.CacheFullAndWriter(nil, nil)
|
||||
file, err := s.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count := float64(len(reqPath) + 1)
|
||||
up(100 / count)
|
||||
for i, path := range reqPath {
|
||||
for _, path := range reqPath {
|
||||
err = errors.Join(err, fs.PutDirectly(ctx, *path, &stream.FileStream{
|
||||
Obj: s,
|
||||
Mimetype: s.GetMimetype(),
|
||||
Reader: file,
|
||||
Obj: s,
|
||||
Mimetype: s.GetMimetype(),
|
||||
WebPutAsTask: s.NeedStore(),
|
||||
Reader: file,
|
||||
}))
|
||||
up(float64(i+2) / float64(count) * 100)
|
||||
_, e := file.Seek(0, io.SeekStart)
|
||||
if e != nil {
|
||||
return errors.Join(err, e)
|
||||
@ -402,24 +402,10 @@ func (d *Alias) Extract(ctx context.Context, obj model.Obj, args model.ArchiveIn
|
||||
return nil, errs.ObjectNotFound
|
||||
}
|
||||
for _, dst := range dsts {
|
||||
reqPath := stdpath.Join(dst, sub)
|
||||
link, err := d.extract(ctx, reqPath, args)
|
||||
if err != nil {
|
||||
continue
|
||||
link, err := d.extract(ctx, dst, sub, args)
|
||||
if err == nil {
|
||||
return link, nil
|
||||
}
|
||||
if link == nil {
|
||||
return &model.Link{
|
||||
URL: fmt.Sprintf("%s/ap%s?inner=%s&pass=%s&sign=%s",
|
||||
common.GetApiUrl(ctx),
|
||||
utils.EncodePath(reqPath, true),
|
||||
utils.EncodePath(args.InnerPath, true),
|
||||
url.QueryEscape(args.Password),
|
||||
sign.SignArchive(reqPath)),
|
||||
}, nil
|
||||
}
|
||||
resultLink := *link
|
||||
resultLink.SyncClosers = utils.NewSyncClosers(link)
|
||||
return &resultLink, nil
|
||||
}
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
@ -2,6 +2,8 @@ package alias
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
stdpath "path"
|
||||
"strings"
|
||||
|
||||
@ -10,6 +12,8 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/fs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/sign"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/OpenListTeam/OpenList/v4/server/common"
|
||||
)
|
||||
|
||||
@ -136,7 +140,8 @@ func (d *Alias) listArchive(ctx context.Context, dst, sub string, args model.Arc
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
func (d *Alias) extract(ctx context.Context, reqPath string, args model.ArchiveInnerArgs) (*model.Link, error) {
|
||||
func (d *Alias) extract(ctx context.Context, dst, sub string, args model.ArchiveInnerArgs) (*model.Link, error) {
|
||||
reqPath := stdpath.Join(dst, sub)
|
||||
storage, reqActualPath, err := op.GetStorageAndActualPath(reqPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -144,12 +149,20 @@ func (d *Alias) extract(ctx context.Context, reqPath string, args model.ArchiveI
|
||||
if _, ok := storage.(driver.ArchiveReader); !ok {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
if args.Redirect && common.ShouldProxy(storage, stdpath.Base(reqPath)) {
|
||||
_, err := fs.Get(ctx, reqPath, &fs.GetArgs{NoLog: true})
|
||||
if err == nil {
|
||||
if args.Redirect && common.ShouldProxy(storage, stdpath.Base(sub)) {
|
||||
_, err = fs.Get(ctx, reqPath, &fs.GetArgs{NoLog: true})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
link := &model.Link{
|
||||
URL: fmt.Sprintf("%s/ap%s?inner=%s&pass=%s&sign=%s",
|
||||
common.GetApiUrl(ctx),
|
||||
utils.EncodePath(reqPath, true),
|
||||
utils.EncodePath(args.InnerPath, true),
|
||||
url.QueryEscape(args.Password),
|
||||
sign.SignArchive(reqPath)),
|
||||
}
|
||||
return link, nil
|
||||
}
|
||||
link, _, err := op.DriverExtract(ctx, storage, reqActualPath, args)
|
||||
return link, err
|
||||
|
@ -191,7 +191,9 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
|
||||
|
||||
hash := stream.GetHash().GetHash(utils.SHA1)
|
||||
if len(hash) != utils.SHA1.Width {
|
||||
_, hash, err = streamPkg.CacheFullAndHash(stream, &up, utils.SHA1)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, hash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -216,13 +218,14 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
|
||||
if !createResp.RapidUpload {
|
||||
// 2. normal upload
|
||||
log.Debugf("[aliyundive_open] normal upload")
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(partSize), &up)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
preTime := time.Now()
|
||||
var offset, length int64 = 0, partSize
|
||||
//var length
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(partSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(createResp.PartInfoList); i++ {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
@ -250,7 +253,7 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second))
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/cloudreve"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/cloudreve_v4"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/crypt"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/degoo"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/doubao"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/doubao_share"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/dropbox"
|
||||
@ -49,7 +48,6 @@ import (
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/onedrive_app"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/onedrive_sharelink"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/openlist"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/openlist_share"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/pikpak"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/pikpak_share"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/quark_open"
|
||||
@ -61,7 +59,6 @@ import (
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/smb"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/strm"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/teambition"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/teldrive"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/terabox"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/thunder"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser"
|
||||
|
@ -237,16 +237,15 @@ func (d *Cloudreve) upLocal(ctx context.Context, stream model.FileStreamer, u Up
|
||||
}
|
||||
|
||||
func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadUrl := u.UploadURLs[0]
|
||||
credential := u.Credential
|
||||
var finish int64 = 0
|
||||
var chunk int = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -295,7 +294,7 @@ func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u U
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -307,14 +306,13 @@ func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u U
|
||||
}
|
||||
|
||||
func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
|
||||
uploadUrl := u.UploadURLs[0]
|
||||
var finish int64 = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT), &up)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadUrl := u.UploadURLs[0]
|
||||
var finish int64 = 0
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -355,7 +353,7 @@ func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -369,15 +367,14 @@ func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u
|
||||
}
|
||||
|
||||
func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var finish int64 = 0
|
||||
var chunk int = 0
|
||||
var etags []string
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -418,7 +415,7 @@ func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u Uploa
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -252,16 +252,15 @@ func (d *CloudreveV4) upLocal(ctx context.Context, file model.FileStreamer, u Fi
|
||||
}
|
||||
|
||||
func (d *CloudreveV4) upRemote(ctx context.Context, file model.FileStreamer, u FileUploadResp, up driver.UpdateProgress) error {
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(DEFAULT), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadUrl := u.UploadUrls[0]
|
||||
credential := u.Credential
|
||||
var finish int64 = 0
|
||||
var chunk int = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for finish < file.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -310,7 +309,7 @@ func (d *CloudreveV4) upRemote(ctx context.Context, file model.FileStreamer, u F
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -322,14 +321,13 @@ func (d *CloudreveV4) upRemote(ctx context.Context, file model.FileStreamer, u F
|
||||
}
|
||||
|
||||
func (d *CloudreveV4) upOneDrive(ctx context.Context, file model.FileStreamer, u FileUploadResp, up driver.UpdateProgress) error {
|
||||
uploadUrl := u.UploadUrls[0]
|
||||
var finish int64 = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(DEFAULT), &up)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadUrl := u.UploadUrls[0]
|
||||
var finish int64 = 0
|
||||
for finish < file.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -371,7 +369,7 @@ func (d *CloudreveV4) upOneDrive(ctx context.Context, file model.FileStreamer, u
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -385,15 +383,14 @@ func (d *CloudreveV4) upOneDrive(ctx context.Context, file model.FileStreamer, u
|
||||
}
|
||||
|
||||
func (d *CloudreveV4) upS3(ctx context.Context, file model.FileStreamer, u FileUploadResp, up driver.UpdateProgress) error {
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(DEFAULT), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var finish int64 = 0
|
||||
var chunk int = 0
|
||||
var etags []string
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for finish < file.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -435,7 +432,7 @@ func (d *CloudreveV4) upS3(ctx context.Context, file model.FileStreamer, u FileU
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -401,6 +401,7 @@ func (d *Crypt) Put(ctx context.Context, dstDir model.Obj, streamer model.FileSt
|
||||
},
|
||||
Reader: wrappedIn,
|
||||
Mimetype: "application/octet-stream",
|
||||
WebPutAsTask: streamer.NeedStore(),
|
||||
ForceStreamUpload: true,
|
||||
Exist: streamer.GetExist(),
|
||||
}
|
||||
|
@ -1,203 +0,0 @@
|
||||
package degoo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
)
|
||||
|
||||
type Degoo struct {
|
||||
model.Storage
|
||||
Addition
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func (d *Degoo) Config() driver.Config {
|
||||
return config
|
||||
}
|
||||
|
||||
func (d *Degoo) GetAddition() driver.Additional {
|
||||
return &d.Addition
|
||||
}
|
||||
|
||||
func (d *Degoo) Init(ctx context.Context) error {
|
||||
|
||||
d.client = base.HttpClient
|
||||
|
||||
// Ensure we have a valid token (will login if needed or refresh if expired)
|
||||
if err := d.ensureValidToken(ctx); err != nil {
|
||||
return fmt.Errorf("failed to initialize token: %w", err)
|
||||
}
|
||||
|
||||
return d.getDevices(ctx)
|
||||
}
|
||||
|
||||
func (d *Degoo) Drop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Degoo) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
|
||||
items, err := d.getAllFileChildren5(ctx, dir.GetID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return utils.MustSliceConvert(items, func(s DegooFileItem) model.Obj {
|
||||
isFolder := s.Category == 2 || s.Category == 1 || s.Category == 10
|
||||
|
||||
createTime, modTime, _ := humanReadableTimes(s.CreationTime, s.LastModificationTime, s.LastUploadTime)
|
||||
|
||||
size, err := strconv.ParseInt(s.Size, 10, 64)
|
||||
if err != nil {
|
||||
size = 0 // Default to 0 if size parsing fails
|
||||
}
|
||||
|
||||
return &model.Object{
|
||||
ID: s.ID,
|
||||
Path: s.FilePath,
|
||||
Name: s.Name,
|
||||
Size: size,
|
||||
Modified: modTime,
|
||||
Ctime: createTime,
|
||||
IsFolder: isFolder,
|
||||
}
|
||||
}), nil
|
||||
}
|
||||
|
||||
func (d *Degoo) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
|
||||
item, err := d.getOverlay4(ctx, file.GetID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &model.Link{URL: item.URL}, nil
|
||||
}
|
||||
|
||||
func (d *Degoo) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error {
|
||||
// This is done by calling the setUploadFile3 API with a special checksum and size.
|
||||
const query = `mutation SetUploadFile3($Token: String!, $FileInfos: [FileInfoUpload3]!) { setUploadFile3(Token: $Token, FileInfos: $FileInfos) }`
|
||||
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"FileInfos": []map[string]interface{}{
|
||||
{
|
||||
"Checksum": folderChecksum,
|
||||
"Name": dirName,
|
||||
"CreationTime": time.Now().UnixMilli(),
|
||||
"ParentID": parentDir.GetID(),
|
||||
"Size": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := d.apiCall(ctx, "SetUploadFile3", query, variables)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Degoo) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
|
||||
const query = `mutation SetMoveFile($Token: String!, $Copy: Boolean, $NewParentID: String!, $FileIDs: [String]!) { setMoveFile(Token: $Token, Copy: $Copy, NewParentID: $NewParentID, FileIDs: $FileIDs) }`
|
||||
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"Copy": false,
|
||||
"NewParentID": dstDir.GetID(),
|
||||
"FileIDs": []string{srcObj.GetID()},
|
||||
}
|
||||
|
||||
_, err := d.apiCall(ctx, "SetMoveFile", query, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return srcObj, nil
|
||||
}
|
||||
|
||||
func (d *Degoo) Rename(ctx context.Context, srcObj model.Obj, newName string) error {
|
||||
const query = `mutation SetRenameFile($Token: String!, $FileRenames: [FileRenameInfo]!) { setRenameFile(Token: $Token, FileRenames: $FileRenames) }`
|
||||
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"FileRenames": []DegooFileRenameInfo{
|
||||
{
|
||||
ID: srcObj.GetID(),
|
||||
NewName: newName,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := d.apiCall(ctx, "SetRenameFile", query, variables)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Degoo) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
|
||||
// Copy is not implemented, Degoo API does not support direct copy.
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
func (d *Degoo) Remove(ctx context.Context, obj model.Obj) error {
|
||||
// Remove deletes a file or folder (moves to trash).
|
||||
const query = `mutation SetDeleteFile5($Token: String!, $IsInRecycleBin: Boolean!, $IDs: [IDType]!) { setDeleteFile5(Token: $Token, IsInRecycleBin: $IsInRecycleBin, IDs: $IDs) }`
|
||||
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"IsInRecycleBin": false,
|
||||
"IDs": []map[string]string{{"FileID": obj.GetID()}},
|
||||
}
|
||||
|
||||
_, err := d.apiCall(ctx, "SetDeleteFile5", query, variables)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *Degoo) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
|
||||
tmpF, err := file.CacheFullAndWriter(&up, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
parentID := dstDir.GetID()
|
||||
|
||||
// Calculate the checksum for the file.
|
||||
checksum, err := d.checkSum(tmpF)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 1. Get upload authorization via getBucketWriteAuth4.
|
||||
auths, err := d.getBucketWriteAuth4(ctx, file, parentID, checksum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. Upload file.
|
||||
// support rapid upload
|
||||
if auths.GetBucketWriteAuth4[0].Error != "Already exist!" {
|
||||
err = d.uploadS3(ctx, auths, tmpF, file, checksum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Register metadata with setUploadFile3.
|
||||
data, err := d.SetUploadFile3(ctx, file, parentID, checksum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !data.SetUploadFile3 {
|
||||
return fmt.Errorf("setUploadFile3 failed: %v", data)
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package degoo
|
||||
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
)
|
||||
|
||||
type Addition struct {
|
||||
driver.RootID
|
||||
Username string `json:"username" help:"Your Degoo account email"`
|
||||
Password string `json:"password" help:"Your Degoo account password"`
|
||||
RefreshToken string `json:"refresh_token" help:"Refresh token for automatic token renewal, obtained automatically"`
|
||||
AccessToken string `json:"access_token" help:"Access token for Degoo API, obtained automatically"`
|
||||
}
|
||||
|
||||
var config = driver.Config{
|
||||
Name: "Degoo",
|
||||
LocalSort: true,
|
||||
DefaultRoot: "0",
|
||||
NoOverwriteUpload: true,
|
||||
}
|
||||
|
||||
func init() {
|
||||
op.RegisterDriver(func() driver.Driver {
|
||||
return &Degoo{}
|
||||
})
|
||||
}
|
@ -1,110 +0,0 @@
|
||||
package degoo
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// DegooLoginRequest represents the login request body.
|
||||
type DegooLoginRequest struct {
|
||||
GenerateToken bool `json:"GenerateToken"`
|
||||
Username string `json:"Username"`
|
||||
Password string `json:"Password"`
|
||||
}
|
||||
|
||||
// DegooLoginResponse represents a successful login response.
|
||||
type DegooLoginResponse struct {
|
||||
Token string `json:"Token"`
|
||||
RefreshToken string `json:"RefreshToken"`
|
||||
}
|
||||
|
||||
// DegooAccessTokenRequest represents the token refresh request body.
|
||||
type DegooAccessTokenRequest struct {
|
||||
RefreshToken string `json:"RefreshToken"`
|
||||
}
|
||||
|
||||
// DegooAccessTokenResponse represents the token refresh response.
|
||||
type DegooAccessTokenResponse struct {
|
||||
AccessToken string `json:"AccessToken"`
|
||||
}
|
||||
|
||||
// DegooFileItem represents a Degoo file or folder.
|
||||
type DegooFileItem struct {
|
||||
ID string `json:"ID"`
|
||||
ParentID string `json:"ParentID"`
|
||||
Name string `json:"Name"`
|
||||
Category int `json:"Category"`
|
||||
Size string `json:"Size"`
|
||||
URL string `json:"URL"`
|
||||
CreationTime string `json:"CreationTime"`
|
||||
LastModificationTime string `json:"LastModificationTime"`
|
||||
LastUploadTime string `json:"LastUploadTime"`
|
||||
MetadataID string `json:"MetadataID"`
|
||||
DeviceID int64 `json:"DeviceID"`
|
||||
FilePath string `json:"FilePath"`
|
||||
IsInRecycleBin bool `json:"IsInRecycleBin"`
|
||||
}
|
||||
|
||||
type DegooErrors struct {
|
||||
Path []string `json:"path"`
|
||||
Data interface{} `json:"data"`
|
||||
ErrorType string `json:"errorType"`
|
||||
ErrorInfo interface{} `json:"errorInfo"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// DegooGraphqlResponse is the common structure for GraphQL API responses.
|
||||
type DegooGraphqlResponse struct {
|
||||
Data json.RawMessage `json:"data"`
|
||||
Errors []DegooErrors `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
// DegooGetChildren5Data is the data field for getFileChildren5.
|
||||
type DegooGetChildren5Data struct {
|
||||
GetFileChildren5 struct {
|
||||
Items []DegooFileItem `json:"Items"`
|
||||
NextToken string `json:"NextToken"`
|
||||
} `json:"getFileChildren5"`
|
||||
}
|
||||
|
||||
// DegooGetOverlay4Data is the data field for getOverlay4.
|
||||
type DegooGetOverlay4Data struct {
|
||||
GetOverlay4 DegooFileItem `json:"getOverlay4"`
|
||||
}
|
||||
|
||||
// DegooFileRenameInfo represents a file rename operation.
|
||||
type DegooFileRenameInfo struct {
|
||||
ID string `json:"ID"`
|
||||
NewName string `json:"NewName"`
|
||||
}
|
||||
|
||||
// DegooFileIDs represents a list of file IDs for move operations.
|
||||
type DegooFileIDs struct {
|
||||
FileIDs []string `json:"FileIDs"`
|
||||
}
|
||||
|
||||
// DegooGetBucketWriteAuth4Data is the data field for GetBucketWriteAuth4.
|
||||
type DegooGetBucketWriteAuth4Data struct {
|
||||
GetBucketWriteAuth4 []struct {
|
||||
AuthData struct {
|
||||
PolicyBase64 string `json:"PolicyBase64"`
|
||||
Signature string `json:"Signature"`
|
||||
BaseURL string `json:"BaseURL"`
|
||||
KeyPrefix string `json:"KeyPrefix"`
|
||||
AccessKey struct {
|
||||
Key string `json:"Key"`
|
||||
Value string `json:"Value"`
|
||||
} `json:"AccessKey"`
|
||||
ACL string `json:"ACL"`
|
||||
AdditionalBody []struct {
|
||||
Key string `json:"Key"`
|
||||
Value string `json:"Value"`
|
||||
} `json:"AdditionalBody"`
|
||||
} `json:"AuthData"`
|
||||
Error interface{} `json:"Error"`
|
||||
} `json:"getBucketWriteAuth4"`
|
||||
}
|
||||
|
||||
// DegooSetUploadFile3Data is the data field for SetUploadFile3.
|
||||
type DegooSetUploadFile3Data struct {
|
||||
SetUploadFile3 bool `json:"setUploadFile3"`
|
||||
}
|
@ -1,198 +0,0 @@
|
||||
package degoo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
)
|
||||
|
||||
func (d *Degoo) getBucketWriteAuth4(ctx context.Context, file model.FileStreamer, parentID string, checksum string) (*DegooGetBucketWriteAuth4Data, error) {
|
||||
const query = `query GetBucketWriteAuth4(
|
||||
$Token: String!
|
||||
$ParentID: String!
|
||||
$StorageUploadInfos: [StorageUploadInfo2]
|
||||
) {
|
||||
getBucketWriteAuth4(
|
||||
Token: $Token
|
||||
ParentID: $ParentID
|
||||
StorageUploadInfos: $StorageUploadInfos
|
||||
) {
|
||||
AuthData {
|
||||
PolicyBase64
|
||||
Signature
|
||||
BaseURL
|
||||
KeyPrefix
|
||||
AccessKey {
|
||||
Key
|
||||
Value
|
||||
}
|
||||
ACL
|
||||
AdditionalBody {
|
||||
Key
|
||||
Value
|
||||
}
|
||||
}
|
||||
Error
|
||||
}
|
||||
}`
|
||||
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"ParentID": parentID,
|
||||
"StorageUploadInfos": []map[string]string{{
|
||||
"FileName": file.GetName(),
|
||||
"Checksum": checksum,
|
||||
"Size": strconv.FormatInt(file.GetSize(), 10),
|
||||
}}}
|
||||
|
||||
data, err := d.apiCall(ctx, "GetBucketWriteAuth4", query, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resp DegooGetBucketWriteAuth4Data
|
||||
err = json.Unmarshal(data, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// checkSum calculates the SHA1-based checksum for Degoo upload API.
|
||||
func (d *Degoo) checkSum(file io.Reader) (string, error) {
|
||||
seed := []byte{13, 7, 2, 2, 15, 40, 75, 117, 13, 10, 19, 16, 29, 23, 3, 36}
|
||||
hasher := sha1.New()
|
||||
hasher.Write(seed)
|
||||
|
||||
if _, err := utils.CopyWithBuffer(hasher, file); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
cs := hasher.Sum(nil)
|
||||
|
||||
csBytes := []byte{10, byte(len(cs))}
|
||||
csBytes = append(csBytes, cs...)
|
||||
csBytes = append(csBytes, 16, 0)
|
||||
|
||||
return strings.ReplaceAll(base64.StdEncoding.EncodeToString(csBytes), "/", "_"), nil
|
||||
}
|
||||
|
||||
func (d *Degoo) uploadS3(ctx context.Context, auths *DegooGetBucketWriteAuth4Data, tmpF model.File, file model.FileStreamer, checksum string) error {
|
||||
a := auths.GetBucketWriteAuth4[0].AuthData
|
||||
|
||||
_, err := tmpF.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ext := utils.Ext(file.GetName())
|
||||
key := fmt.Sprintf("%s%s/%s.%s", a.KeyPrefix, ext, checksum, ext)
|
||||
|
||||
var b bytes.Buffer
|
||||
w := multipart.NewWriter(&b)
|
||||
err = w.WriteField("key", key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = w.WriteField("acl", a.ACL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = w.WriteField("policy", a.PolicyBase64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = w.WriteField("signature", a.Signature)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = w.WriteField(a.AccessKey.Key, a.AccessKey.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, additional := range a.AdditionalBody {
|
||||
err = w.WriteField(additional.Key, additional.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = w.WriteField("Content-Type", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = w.CreateFormFile("file", key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
headSize := b.Len()
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
head := bytes.NewReader(b.Bytes()[:headSize])
|
||||
tail := bytes.NewReader(b.Bytes()[headSize:])
|
||||
|
||||
rateLimitedRd := driver.NewLimitedUploadStream(ctx, io.MultiReader(head, tmpF, tail))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.BaseURL, rateLimitedRd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("ngsw-bypass", "1")
|
||||
req.Header.Add("Content-Type", w.FormDataContentType())
|
||||
|
||||
res, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusNoContent {
|
||||
return fmt.Errorf("upload failed with status code %d", res.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ driver.Driver = (*Degoo)(nil)
|
||||
|
||||
func (d *Degoo) SetUploadFile3(ctx context.Context, file model.FileStreamer, parentID string, checksum string) (*DegooSetUploadFile3Data, error) {
|
||||
const query = `mutation SetUploadFile3($Token: String!, $FileInfos: [FileInfoUpload3]!) {
|
||||
setUploadFile3(Token: $Token, FileInfos: $FileInfos)
|
||||
}`
|
||||
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"FileInfos": []map[string]string{{
|
||||
"Checksum": checksum,
|
||||
"CreationTime": strconv.FormatInt(file.CreateTime().UnixMilli(), 10),
|
||||
"Name": file.GetName(),
|
||||
"ParentID": parentID,
|
||||
"Size": strconv.FormatInt(file.GetSize(), 10),
|
||||
}}}
|
||||
|
||||
data, err := d.apiCall(ctx, "SetUploadFile3", query, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resp DegooSetUploadFile3Data
|
||||
err = json.Unmarshal(data, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
@ -1,462 +0,0 @@
|
||||
package degoo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
)
|
||||
|
||||
// Thanks to https://github.com/bernd-wechner/Degoo for API research.
|
||||
|
||||
const (
|
||||
// API endpoints
|
||||
loginURL = "https://rest-api.degoo.com/login"
|
||||
accessTokenURL = "https://rest-api.degoo.com/access-token/v2"
|
||||
apiURL = "https://production-appsync.degoo.com/graphql"
|
||||
|
||||
// API configuration
|
||||
apiKey = "da2-vs6twz5vnjdavpqndtbzg3prra"
|
||||
folderChecksum = "CgAQAg"
|
||||
|
||||
// Token management
|
||||
tokenRefreshThreshold = 5 * time.Minute
|
||||
|
||||
// Rate limiting
|
||||
minRequestInterval = 1 * time.Second
|
||||
|
||||
// Error messages
|
||||
errRateLimited = "rate limited (429), please try again later"
|
||||
errUnauthorized = "unauthorized access"
|
||||
)
|
||||
|
||||
var (
|
||||
// Global rate limiting - protects against concurrent API calls
|
||||
lastRequestTime time.Time
|
||||
requestMutex sync.Mutex
|
||||
)
|
||||
|
||||
// JWT payload structure for token expiration checking
|
||||
type JWTPayload struct {
|
||||
UserID string `json:"userID"`
|
||||
Exp int64 `json:"exp"`
|
||||
Iat int64 `json:"iat"`
|
||||
}
|
||||
|
||||
// Rate limiting helper functions
|
||||
|
||||
// applyRateLimit ensures minimum interval between API requests
|
||||
func applyRateLimit() {
|
||||
requestMutex.Lock()
|
||||
defer requestMutex.Unlock()
|
||||
|
||||
if !lastRequestTime.IsZero() {
|
||||
if elapsed := time.Since(lastRequestTime); elapsed < minRequestInterval {
|
||||
time.Sleep(minRequestInterval - elapsed)
|
||||
}
|
||||
}
|
||||
lastRequestTime = time.Now()
|
||||
}
|
||||
|
||||
// HTTP request helper functions
|
||||
|
||||
// createJSONRequest creates a new HTTP request with JSON body
|
||||
func createJSONRequest(ctx context.Context, method, url string, body interface{}) (*http.Request, error) {
|
||||
jsonBody, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request body: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(jsonBody))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", base.UserAgent)
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// checkHTTPResponse checks for common HTTP error conditions
|
||||
func checkHTTPResponse(resp *http.Response, operation string) error {
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
return fmt.Errorf("%s %s", operation, errRateLimited)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("%s failed: %s", operation, resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// isTokenExpired checks if the JWT token is expired or will expire soon
|
||||
func (d *Degoo) isTokenExpired() bool {
|
||||
if d.AccessToken == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
payload, err := extractJWTPayload(d.AccessToken)
|
||||
if err != nil {
|
||||
return true // Invalid token format
|
||||
}
|
||||
|
||||
// Check if token expires within the threshold
|
||||
expireTime := time.Unix(payload.Exp, 0)
|
||||
return time.Now().Add(tokenRefreshThreshold).After(expireTime)
|
||||
}
|
||||
|
||||
// extractJWTPayload extracts and parses JWT payload
|
||||
func extractJWTPayload(token string) (*JWTPayload, error) {
|
||||
parts := strings.Split(token, ".")
|
||||
if len(parts) != 3 {
|
||||
return nil, fmt.Errorf("invalid JWT format")
|
||||
}
|
||||
|
||||
// Decode the payload (second part)
|
||||
payload, err := base64.RawURLEncoding.DecodeString(parts[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode JWT payload: %w", err)
|
||||
}
|
||||
|
||||
var jwtPayload JWTPayload
|
||||
if err := json.Unmarshal(payload, &jwtPayload); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse JWT payload: %w", err)
|
||||
}
|
||||
|
||||
return &jwtPayload, nil
|
||||
}
|
||||
|
||||
// refreshToken attempts to refresh the access token using the refresh token
|
||||
func (d *Degoo) refreshToken(ctx context.Context) error {
|
||||
if d.RefreshToken == "" {
|
||||
return fmt.Errorf("no refresh token available")
|
||||
}
|
||||
|
||||
// Create request
|
||||
tokenReq := DegooAccessTokenRequest{RefreshToken: d.RefreshToken}
|
||||
req, err := createJSONRequest(ctx, "POST", accessTokenURL, tokenReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create refresh token request: %w", err)
|
||||
}
|
||||
|
||||
// Execute request
|
||||
resp, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("refresh token request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Check response
|
||||
if err := checkHTTPResponse(resp, "refresh token"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var accessTokenResp DegooAccessTokenResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&accessTokenResp); err != nil {
|
||||
return fmt.Errorf("failed to parse access token response: %w", err)
|
||||
}
|
||||
|
||||
if accessTokenResp.AccessToken == "" {
|
||||
return fmt.Errorf("empty access token received")
|
||||
}
|
||||
|
||||
d.AccessToken = accessTokenResp.AccessToken
|
||||
// Save the updated token to storage
|
||||
op.MustSaveDriverStorage(d)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureValidToken ensures we have a valid, non-expired token
|
||||
func (d *Degoo) ensureValidToken(ctx context.Context) error {
|
||||
// Check if token is expired or will expire soon
|
||||
if d.isTokenExpired() {
|
||||
// Try to refresh token first if we have a refresh token
|
||||
if d.RefreshToken != "" {
|
||||
if refreshErr := d.refreshToken(ctx); refreshErr == nil {
|
||||
return nil // Successfully refreshed
|
||||
} else {
|
||||
// If refresh failed, fall back to full login
|
||||
fmt.Printf("Token refresh failed, falling back to full login: %v\n", refreshErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Perform full login
|
||||
if d.Username != "" && d.Password != "" {
|
||||
return d.login(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// login performs the login process and retrieves the access token.
|
||||
func (d *Degoo) login(ctx context.Context) error {
|
||||
if d.Username == "" || d.Password == "" {
|
||||
return fmt.Errorf("username or password not provided")
|
||||
}
|
||||
|
||||
creds := DegooLoginRequest{
|
||||
GenerateToken: true,
|
||||
Username: d.Username,
|
||||
Password: d.Password,
|
||||
}
|
||||
|
||||
jsonCreds, err := json.Marshal(creds)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to serialize login credentials: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", loginURL, bytes.NewBuffer(jsonCreds))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create login request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", base.UserAgent)
|
||||
req.Header.Set("Origin", "https://app.degoo.com")
|
||||
|
||||
resp, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("login request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Handle rate limiting (429 Too Many Requests)
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
return fmt.Errorf("login rate limited (429), please try again later")
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("login failed: %s", resp.Status)
|
||||
}
|
||||
|
||||
var loginResp DegooLoginResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&loginResp); err != nil {
|
||||
return fmt.Errorf("failed to parse login response: %w", err)
|
||||
}
|
||||
|
||||
if loginResp.RefreshToken != "" {
|
||||
tokenReq := DegooAccessTokenRequest{RefreshToken: loginResp.RefreshToken}
|
||||
jsonTokenReq, err := json.Marshal(tokenReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to serialize access token request: %w", err)
|
||||
}
|
||||
|
||||
tokenReqHTTP, err := http.NewRequestWithContext(ctx, "POST", accessTokenURL, bytes.NewBuffer(jsonTokenReq))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create access token request: %w", err)
|
||||
}
|
||||
|
||||
tokenReqHTTP.Header.Set("User-Agent", base.UserAgent)
|
||||
|
||||
tokenResp, err := d.client.Do(tokenReqHTTP)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get access token: %w", err)
|
||||
}
|
||||
defer tokenResp.Body.Close()
|
||||
|
||||
var accessTokenResp DegooAccessTokenResponse
|
||||
if err := json.NewDecoder(tokenResp.Body).Decode(&accessTokenResp); err != nil {
|
||||
return fmt.Errorf("failed to parse access token response: %w", err)
|
||||
}
|
||||
d.AccessToken = accessTokenResp.AccessToken
|
||||
d.RefreshToken = loginResp.RefreshToken // Save refresh token
|
||||
} else if loginResp.Token != "" {
|
||||
d.AccessToken = loginResp.Token
|
||||
d.RefreshToken = "" // Direct token, no refresh token available
|
||||
} else {
|
||||
return fmt.Errorf("login failed, no valid token returned")
|
||||
}
|
||||
|
||||
// Save the updated tokens to storage
|
||||
op.MustSaveDriverStorage(d)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// apiCall performs a Degoo GraphQL API request.
|
||||
func (d *Degoo) apiCall(ctx context.Context, operationName, query string, variables map[string]interface{}) (json.RawMessage, error) {
|
||||
// Apply rate limiting
|
||||
applyRateLimit()
|
||||
|
||||
// Ensure we have a valid token before making the API call
|
||||
if err := d.ensureValidToken(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to ensure valid token: %w", err)
|
||||
}
|
||||
|
||||
// Update the Token in variables if it exists (after potential refresh)
|
||||
d.updateTokenInVariables(variables)
|
||||
|
||||
return d.executeGraphQLRequest(ctx, operationName, query, variables)
|
||||
}
|
||||
|
||||
// updateTokenInVariables updates the Token field in GraphQL variables
|
||||
func (d *Degoo) updateTokenInVariables(variables map[string]interface{}) {
|
||||
if variables != nil {
|
||||
if _, hasToken := variables["Token"]; hasToken {
|
||||
variables["Token"] = d.AccessToken
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// executeGraphQLRequest executes a GraphQL request with retry logic
|
||||
func (d *Degoo) executeGraphQLRequest(ctx context.Context, operationName, query string, variables map[string]interface{}) (json.RawMessage, error) {
|
||||
reqBody := map[string]interface{}{
|
||||
"operationName": operationName,
|
||||
"query": query,
|
||||
"variables": variables,
|
||||
}
|
||||
|
||||
// Create and configure request
|
||||
req, err := createJSONRequest(ctx, "POST", apiURL, reqBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Set Degoo-specific headers
|
||||
req.Header.Set("x-api-key", apiKey)
|
||||
if d.AccessToken != "" {
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", d.AccessToken))
|
||||
}
|
||||
|
||||
// Execute request
|
||||
resp, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("GraphQL API request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Check for HTTP errors
|
||||
if err := checkHTTPResponse(resp, "GraphQL API"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse GraphQL response
|
||||
var degooResp DegooGraphqlResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(°ooResp); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode GraphQL response: %w", err)
|
||||
}
|
||||
|
||||
// Handle GraphQL errors
|
||||
if len(degooResp.Errors) > 0 {
|
||||
return d.handleGraphQLError(ctx, degooResp.Errors[0], operationName, query, variables)
|
||||
}
|
||||
|
||||
return degooResp.Data, nil
|
||||
}
|
||||
|
||||
// handleGraphQLError handles GraphQL-level errors with retry logic
|
||||
func (d *Degoo) handleGraphQLError(ctx context.Context, gqlError DegooErrors, operationName, query string, variables map[string]interface{}) (json.RawMessage, error) {
|
||||
if gqlError.ErrorType == "Unauthorized" {
|
||||
// Re-login and retry
|
||||
if err := d.login(ctx); err != nil {
|
||||
return nil, fmt.Errorf("%s, login failed: %w", errUnauthorized, err)
|
||||
}
|
||||
|
||||
// Update token in variables and retry
|
||||
d.updateTokenInVariables(variables)
|
||||
return d.apiCall(ctx, operationName, query, variables)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("GraphQL API error: %s", gqlError.Message)
|
||||
}
|
||||
|
||||
// humanReadableTimes converts Degoo timestamps to Go time.Time.
|
||||
func humanReadableTimes(creation, modification, upload string) (cTime, mTime, uTime time.Time) {
|
||||
cTime, _ = time.Parse(time.RFC3339, creation)
|
||||
if modification != "" {
|
||||
modMillis, _ := strconv.ParseInt(modification, 10, 64)
|
||||
mTime = time.Unix(0, modMillis*int64(time.Millisecond))
|
||||
}
|
||||
if upload != "" {
|
||||
upMillis, _ := strconv.ParseInt(upload, 10, 64)
|
||||
uTime = time.Unix(0, upMillis*int64(time.Millisecond))
|
||||
}
|
||||
return cTime, mTime, uTime
|
||||
}
|
||||
|
||||
// getDevices fetches and caches top-level devices and folders.
|
||||
func (d *Degoo) getDevices(ctx context.Context) error {
|
||||
const query = `query GetFileChildren5($Token: String! $ParentID: String $AllParentIDs: [String] $Limit: Int! $Order: Int! $NextToken: String ) { getFileChildren5(Token: $Token ParentID: $ParentID AllParentIDs: $AllParentIDs Limit: $Limit Order: $Order NextToken: $NextToken) { Items { ParentID } NextToken } }`
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"ParentID": "0",
|
||||
"Limit": 10,
|
||||
"Order": 3,
|
||||
}
|
||||
data, err := d.apiCall(ctx, "GetFileChildren5", query, variables)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var resp DegooGetChildren5Data
|
||||
if err := json.Unmarshal(data, &resp); err != nil {
|
||||
return fmt.Errorf("failed to parse device list: %w", err)
|
||||
}
|
||||
if d.RootFolderID == "0" {
|
||||
if len(resp.GetFileChildren5.Items) > 0 {
|
||||
d.RootFolderID = resp.GetFileChildren5.Items[0].ParentID
|
||||
}
|
||||
op.MustSaveDriverStorage(d)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAllFileChildren5 fetches all children of a directory with pagination.
|
||||
func (d *Degoo) getAllFileChildren5(ctx context.Context, parentID string) ([]DegooFileItem, error) {
|
||||
const query = `query GetFileChildren5($Token: String! $ParentID: String $AllParentIDs: [String] $Limit: Int! $Order: Int! $NextToken: String ) { getFileChildren5(Token: $Token ParentID: $ParentID AllParentIDs: $AllParentIDs Limit: $Limit Order: $Order NextToken: $NextToken) { Items { ID ParentID Name Category Size CreationTime LastModificationTime LastUploadTime FilePath IsInRecycleBin DeviceID MetadataID } NextToken } }`
|
||||
var allItems []DegooFileItem
|
||||
nextToken := ""
|
||||
for {
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"ParentID": parentID,
|
||||
"Limit": 1000,
|
||||
"Order": 3,
|
||||
}
|
||||
if nextToken != "" {
|
||||
variables["NextToken"] = nextToken
|
||||
}
|
||||
data, err := d.apiCall(ctx, "GetFileChildren5", query, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var resp DegooGetChildren5Data
|
||||
if err := json.Unmarshal(data, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
allItems = append(allItems, resp.GetFileChildren5.Items...)
|
||||
if resp.GetFileChildren5.NextToken == "" {
|
||||
break
|
||||
}
|
||||
nextToken = resp.GetFileChildren5.NextToken
|
||||
}
|
||||
return allItems, nil
|
||||
}
|
||||
|
||||
// getOverlay4 fetches metadata for a single item by ID.
|
||||
func (d *Degoo) getOverlay4(ctx context.Context, id string) (DegooFileItem, error) {
|
||||
const query = `query GetOverlay4($Token: String!, $ID: IDType!) { getOverlay4(Token: $Token, ID: $ID) { ID ParentID Name Category Size CreationTime LastModificationTime LastUploadTime URL FilePath IsInRecycleBin DeviceID MetadataID } }`
|
||||
variables := map[string]interface{}{
|
||||
"Token": d.AccessToken,
|
||||
"ID": map[string]string{
|
||||
"FileID": id,
|
||||
},
|
||||
}
|
||||
data, err := d.apiCall(ctx, "GetOverlay4", query, variables)
|
||||
if err != nil {
|
||||
return DegooFileItem{}, err
|
||||
}
|
||||
var resp DegooGetOverlay4Data
|
||||
if err := json.Unmarshal(data, &resp); err != nil {
|
||||
return DegooFileItem{}, fmt.Errorf("failed to parse item metadata: %w", err)
|
||||
}
|
||||
return resp.GetOverlay4, nil
|
||||
}
|
@ -449,11 +449,10 @@ func (d *Doubao) uploadNode(uploadConfig *UploadConfig, dir model.Obj, file mode
|
||||
|
||||
// Upload 普通上传实现
|
||||
func (d *Doubao) Upload(ctx context.Context, config *UploadConfig, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, dataType string) (model.Obj, error) {
|
||||
ss, err := stream.NewStreamSectionReader(file, int(file.GetSize()), &up)
|
||||
ss, err := stream.NewStreamSectionReader(file, int(file.GetSize()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader, err := ss.GetSectionReader(0, file.GetSize())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -504,7 +503,7 @@ func (d *Doubao) Upload(ctx context.Context, config *UploadConfig, dstDir model.
|
||||
}
|
||||
return nil
|
||||
})
|
||||
ss.FreeSectionReader(reader)
|
||||
ss.RecycleSectionReader(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -543,15 +542,15 @@ func (d *Doubao) UploadByMultipart(ctx context.Context, config *UploadConfig, fi
|
||||
if config.InnerUploadAddress.AdvanceOption.SliceSize > 0 {
|
||||
chunkSize = int64(config.InnerUploadAddress.AdvanceOption.SliceSize)
|
||||
}
|
||||
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
totalParts := (fileSize + chunkSize - 1) / chunkSize
|
||||
// 创建分片信息组
|
||||
parts := make([]UploadPart, totalParts)
|
||||
|
||||
// 用 stream.NewStreamSectionReader 替代缓存临时文件
|
||||
ss, err := stream.NewStreamSectionReader(file, int(chunkSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
up(10.0) // 更新进度
|
||||
// 设置并行上传
|
||||
thread := min(int(totalParts), d.uploadThread)
|
||||
@ -642,7 +641,7 @@ func (d *Doubao) UploadByMultipart(ctx context.Context, config *UploadConfig, fi
|
||||
return nil
|
||||
},
|
||||
After: func(err error) {
|
||||
ss.FreeSectionReader(reader)
|
||||
ss.RecycleSectionReader(reader)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ type Addition struct {
|
||||
ClientSecret string `json:"client_secret" required:"false" help:"Keep it empty if you don't have one"`
|
||||
AccessToken string
|
||||
RefreshToken string `json:"refresh_token" required:"true"`
|
||||
RootNamespaceId string `json:"RootNamespaceId" required:"false"`
|
||||
RootNamespaceId string
|
||||
}
|
||||
|
||||
var config = driver.Config{
|
||||
|
@ -175,13 +175,6 @@ func (d *Dropbox) finishUploadSession(ctx context.Context, toPath string, offset
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
req.Header.Set("Authorization", "Bearer "+d.AccessToken)
|
||||
if d.RootNamespaceId != "" {
|
||||
apiPathRootJson, err := d.buildPathRootHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Dropbox-API-Path-Root", apiPathRootJson)
|
||||
}
|
||||
|
||||
uploadFinishArgs := UploadFinishArgs{
|
||||
Commit: struct {
|
||||
@ -226,13 +219,6 @@ func (d *Dropbox) startUploadSession(ctx context.Context) (string, error) {
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
req.Header.Set("Authorization", "Bearer "+d.AccessToken)
|
||||
if d.RootNamespaceId != "" {
|
||||
apiPathRootJson, err := d.buildPathRootHeader()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Dropbox-API-Path-Root", apiPathRootJson)
|
||||
}
|
||||
req.Header.Set("Dropbox-API-Arg", "{\"close\":false}")
|
||||
|
||||
res, err := base.HttpClient.Do(req)
|
||||
@ -247,11 +233,3 @@ func (d *Dropbox) startUploadSession(ctx context.Context) (string, error) {
|
||||
_ = res.Body.Close()
|
||||
return sessionId, nil
|
||||
}
|
||||
|
||||
func (d *Dropbox) buildPathRootHeader() (string, error) {
|
||||
return utils.Json.MarshalToString(map[string]interface{}{
|
||||
".tag": "root",
|
||||
"root": d.RootNamespaceId,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -162,7 +162,7 @@ func (d *GoogleDrive) Put(ctx context.Context, dstDir model.Obj, stream model.Fi
|
||||
SetBody(driver.NewLimitedUploadStream(ctx, stream))
|
||||
}, nil)
|
||||
} else {
|
||||
err = d.chunkUpload(ctx, stream, putUrl, up)
|
||||
err = d.chunkUpload(ctx, stream, putUrl)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -254,14 +254,13 @@ func (d *GoogleDrive) getFiles(id string) ([]File, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (d *GoogleDrive) chunkUpload(ctx context.Context, file model.FileStreamer, url string, up driver.UpdateProgress) error {
|
||||
func (d *GoogleDrive) chunkUpload(ctx context.Context, file model.FileStreamer, url string) error {
|
||||
var defaultChunkSize = d.ChunkSize * 1024 * 1024
|
||||
ss, err := stream.NewStreamSectionReader(file, int(defaultChunkSize), &up)
|
||||
var offset int64 = 0
|
||||
ss, err := stream.NewStreamSectionReader(file, int(defaultChunkSize))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var offset int64 = 0
|
||||
url += "?includeItemsFromAllDrives=true&supportsAllDrives=true"
|
||||
for offset < file.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
@ -301,13 +300,12 @@ func (d *GoogleDrive) chunkUpload(ctx context.Context, file model.FileStreamer,
|
||||
}
|
||||
return fmt.Errorf("%s: %v", e.Error.Message, e.Error.Errors)
|
||||
}
|
||||
up(float64(offset+chunkSize) / float64(file.GetSize()) * 100)
|
||||
return nil
|
||||
},
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second))
|
||||
ss.FreeSectionReader(reader)
|
||||
ss.RecycleSectionReader(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -276,7 +276,9 @@ func (d *ILanZou) Put(ctx context.Context, dstDir model.Obj, s model.FileStreame
|
||||
etag := s.GetHash().GetHash(utils.MD5)
|
||||
var err error
|
||||
if len(etag) != utils.MD5.Width {
|
||||
_, etag, err = stream.CacheFullAndHash(s, &up, utils.MD5)
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, etag, err = stream.CacheFullInTempFileAndHash(s, cacheFileProgress, utils.MD5)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -296,23 +298,6 @@ func (d *ILanZou) Put(ctx context.Context, dstDir model.Obj, s model.FileStreame
|
||||
return nil, err
|
||||
}
|
||||
upToken := utils.Json.Get(res, "upToken").ToString()
|
||||
if upToken == "-1" {
|
||||
// 支持秒传
|
||||
var resp UploadTokenRapidResp
|
||||
err := utils.Json.Unmarshal(res, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &model.Object{
|
||||
ID: strconv.FormatInt(resp.Map.FileID, 10),
|
||||
Name: resp.Map.FileName,
|
||||
Size: s.GetSize(),
|
||||
Modified: s.ModTime(),
|
||||
Ctime: s.CreateTime(),
|
||||
IsFolder: false,
|
||||
HashInfo: utils.NewHashInfo(utils.MD5, etag),
|
||||
}, nil
|
||||
}
|
||||
now := time.Now()
|
||||
key := fmt.Sprintf("disk/%d/%d/%d/%s/%016d", now.Year(), now.Month(), now.Day(), d.account, now.UnixMilli())
|
||||
reader := driver.NewLimitedUploadStream(ctx, &driver.ReaderUpdatingProgress{
|
||||
|
@ -29,10 +29,9 @@ func init() {
|
||||
op.RegisterDriver(func() driver.Driver {
|
||||
return &ILanZou{
|
||||
config: driver.Config{
|
||||
Name: "ILanZou",
|
||||
DefaultRoot: "0",
|
||||
LocalSort: true,
|
||||
NoOverwriteUpload: true,
|
||||
Name: "ILanZou",
|
||||
DefaultRoot: "0",
|
||||
LocalSort: true,
|
||||
},
|
||||
conf: Conf{
|
||||
base: "https://api.ilanzou.com",
|
||||
@ -48,10 +47,9 @@ func init() {
|
||||
op.RegisterDriver(func() driver.Driver {
|
||||
return &ILanZou{
|
||||
config: driver.Config{
|
||||
Name: "FeijiPan",
|
||||
DefaultRoot: "0",
|
||||
LocalSort: true,
|
||||
NoOverwriteUpload: true,
|
||||
Name: "FeijiPan",
|
||||
DefaultRoot: "0",
|
||||
LocalSort: true,
|
||||
},
|
||||
conf: Conf{
|
||||
base: "https://api.feijipan.com",
|
||||
|
@ -43,18 +43,6 @@ type Part struct {
|
||||
ETag string `json:"etag"`
|
||||
}
|
||||
|
||||
type UploadTokenRapidResp struct {
|
||||
Msg string `json:"msg"`
|
||||
Code int `json:"code"`
|
||||
UpToken string `json:"upToken"`
|
||||
Map struct {
|
||||
FileIconID int `json:"fileIconId"`
|
||||
FileName string `json:"fileName"`
|
||||
FileIcon string `json:"fileIcon"`
|
||||
FileID int64 `json:"fileId"`
|
||||
} `json:"map"`
|
||||
}
|
||||
|
||||
type UploadResultResp struct {
|
||||
Msg string `json:"msg"`
|
||||
Code int `json:"code"`
|
||||
|
@ -180,7 +180,7 @@ func (d *MediaTrack) Put(ctx context.Context, dstDir model.Obj, file model.FileS
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempFile, err := file.CacheFullAndWriter(&up, nil)
|
||||
tempFile, err := file.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -263,7 +263,7 @@ func (d *MoPan) Remove(ctx context.Context, obj model.Obj) error {
|
||||
}
|
||||
|
||||
func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
|
||||
file, err := stream.CacheFullAndWriter(&up, nil)
|
||||
file, err := stream.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -223,7 +223,7 @@ func (d *NeteaseMusic) removeSongObj(file model.Obj) error {
|
||||
}
|
||||
|
||||
func (d *NeteaseMusic) putSongStream(ctx context.Context, stream model.FileStreamer, up driver.UpdateProgress) error {
|
||||
tmp, err := stream.CacheFullAndWriter(&up, nil)
|
||||
tmp, err := stream.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -238,14 +238,13 @@ func (d *Onedrive) upBig(ctx context.Context, dstDir model.Obj, stream model.Fil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uploadUrl := jsoniter.Get(res, "uploadUrl").ToString()
|
||||
var finish int64 = 0
|
||||
DEFAULT := d.ChunkSize * 1024 * 1024
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT), &up)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadUrl := jsoniter.Get(res, "uploadUrl").ToString()
|
||||
var finish int64 = 0
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -286,7 +285,7 @@ func (d *Onedrive) upBig(ctx context.Context, dstDir model.Obj, stream model.Fil
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -152,14 +152,13 @@ func (d *OnedriveAPP) upBig(ctx context.Context, dstDir model.Obj, stream model.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uploadUrl := jsoniter.Get(res, "uploadUrl").ToString()
|
||||
var finish int64 = 0
|
||||
DEFAULT := d.ChunkSize * 1024 * 1024
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT), &up)
|
||||
ss, err := streamPkg.NewStreamSectionReader(stream, int(DEFAULT))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadUrl := jsoniter.Get(res, "uploadUrl").ToString()
|
||||
var finish int64 = 0
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -200,7 +199,7 @@ func (d *OnedriveAPP) upBig(ctx context.Context, dstDir model.Obj, stream model.
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
ss.FreeSectionReader(rd)
|
||||
ss.RecycleSectionReader(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1,181 +0,0 @@
|
||||
package openlist_share
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
stdpath "path"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/OpenListTeam/OpenList/v4/server/common"
|
||||
"github.com/go-resty/resty/v2"
|
||||
)
|
||||
|
||||
type OpenListShare struct {
|
||||
model.Storage
|
||||
Addition
|
||||
serverArchivePreview bool
|
||||
}
|
||||
|
||||
func (d *OpenListShare) Config() driver.Config {
|
||||
return config
|
||||
}
|
||||
|
||||
func (d *OpenListShare) GetAddition() driver.Additional {
|
||||
return &d.Addition
|
||||
}
|
||||
|
||||
func (d *OpenListShare) Init(ctx context.Context) error {
|
||||
d.Addition.Address = strings.TrimSuffix(d.Addition.Address, "/")
|
||||
var settings common.Resp[map[string]string]
|
||||
_, _, err := d.request("/public/settings", http.MethodGet, func(req *resty.Request) {
|
||||
req.SetResult(&settings)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.serverArchivePreview = settings.Data["share_archive_preview"] == "true"
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *OpenListShare) Drop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *OpenListShare) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
|
||||
var resp common.Resp[FsListResp]
|
||||
_, _, err := d.request("/fs/list", http.MethodPost, func(req *resty.Request) {
|
||||
req.SetResult(&resp).SetBody(ListReq{
|
||||
PageReq: model.PageReq{
|
||||
Page: 1,
|
||||
PerPage: 0,
|
||||
},
|
||||
Path: stdpath.Join(fmt.Sprintf("/@s/%s", d.ShareId), dir.GetPath()),
|
||||
Password: d.Pwd,
|
||||
Refresh: false,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var files []model.Obj
|
||||
for _, f := range resp.Data.Content {
|
||||
file := model.ObjThumb{
|
||||
Object: model.Object{
|
||||
Name: f.Name,
|
||||
Modified: f.Modified,
|
||||
Ctime: f.Created,
|
||||
Size: f.Size,
|
||||
IsFolder: f.IsDir,
|
||||
HashInfo: utils.FromString(f.HashInfo),
|
||||
},
|
||||
Thumbnail: model.Thumbnail{Thumbnail: f.Thumb},
|
||||
}
|
||||
files = append(files, &file)
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (d *OpenListShare) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
|
||||
path := utils.FixAndCleanPath(stdpath.Join(d.ShareId, file.GetPath()))
|
||||
u := fmt.Sprintf("%s/sd%s?pwd=%s", d.Address, path, d.Pwd)
|
||||
return &model.Link{URL: u}, nil
|
||||
}
|
||||
|
||||
func (d *OpenListShare) GetArchiveMeta(ctx context.Context, obj model.Obj, args model.ArchiveArgs) (model.ArchiveMeta, error) {
|
||||
if !d.serverArchivePreview || !d.ForwardArchiveReq {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
var resp common.Resp[ArchiveMetaResp]
|
||||
_, code, err := d.request("/fs/archive/meta", http.MethodPost, func(req *resty.Request) {
|
||||
req.SetResult(&resp).SetBody(ArchiveMetaReq{
|
||||
ArchivePass: args.Password,
|
||||
Path: stdpath.Join(fmt.Sprintf("/@s/%s", d.ShareId), obj.GetPath()),
|
||||
Password: d.Pwd,
|
||||
Refresh: false,
|
||||
})
|
||||
})
|
||||
if code == 202 {
|
||||
return nil, errs.WrongArchivePassword
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tree []model.ObjTree
|
||||
if resp.Data.Content != nil {
|
||||
tree = make([]model.ObjTree, 0, len(resp.Data.Content))
|
||||
for _, content := range resp.Data.Content {
|
||||
tree = append(tree, &content)
|
||||
}
|
||||
}
|
||||
return &model.ArchiveMetaInfo{
|
||||
Comment: resp.Data.Comment,
|
||||
Encrypted: resp.Data.Encrypted,
|
||||
Tree: tree,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *OpenListShare) ListArchive(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) ([]model.Obj, error) {
|
||||
if !d.serverArchivePreview || !d.ForwardArchiveReq {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
var resp common.Resp[ArchiveListResp]
|
||||
_, code, err := d.request("/fs/archive/list", http.MethodPost, func(req *resty.Request) {
|
||||
req.SetResult(&resp).SetBody(ArchiveListReq{
|
||||
ArchiveMetaReq: ArchiveMetaReq{
|
||||
ArchivePass: args.Password,
|
||||
Path: stdpath.Join(fmt.Sprintf("/@s/%s", d.ShareId), obj.GetPath()),
|
||||
Password: d.Pwd,
|
||||
Refresh: false,
|
||||
},
|
||||
PageReq: model.PageReq{
|
||||
Page: 1,
|
||||
PerPage: 0,
|
||||
},
|
||||
InnerPath: args.InnerPath,
|
||||
})
|
||||
})
|
||||
if code == 202 {
|
||||
return nil, errs.WrongArchivePassword
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var files []model.Obj
|
||||
for _, f := range resp.Data.Content {
|
||||
file := model.ObjThumb{
|
||||
Object: model.Object{
|
||||
Name: f.Name,
|
||||
Modified: f.Modified,
|
||||
Ctime: f.Created,
|
||||
Size: f.Size,
|
||||
IsFolder: f.IsDir,
|
||||
HashInfo: utils.FromString(f.HashInfo),
|
||||
},
|
||||
Thumbnail: model.Thumbnail{Thumbnail: f.Thumb},
|
||||
}
|
||||
files = append(files, &file)
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (d *OpenListShare) Extract(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) (*model.Link, error) {
|
||||
if !d.serverArchivePreview || !d.ForwardArchiveReq {
|
||||
return nil, errs.NotSupport
|
||||
}
|
||||
path := utils.FixAndCleanPath(stdpath.Join(d.ShareId, obj.GetPath()))
|
||||
u := fmt.Sprintf("%s/sad%s?pwd=%s&inner=%s&pass=%s",
|
||||
d.Address,
|
||||
path,
|
||||
d.Pwd,
|
||||
utils.EncodePath(args.InnerPath, true),
|
||||
url.QueryEscape(args.Password))
|
||||
return &model.Link{URL: u}, nil
|
||||
}
|
||||
|
||||
var _ driver.Driver = (*OpenListShare)(nil)
|
@ -1,27 +0,0 @@
|
||||
package openlist_share
|
||||
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
)
|
||||
|
||||
type Addition struct {
|
||||
driver.RootPath
|
||||
Address string `json:"url" required:"true"`
|
||||
ShareId string `json:"sid" required:"true"`
|
||||
Pwd string `json:"pwd"`
|
||||
ForwardArchiveReq bool `json:"forward_archive_requests" default:"true"`
|
||||
}
|
||||
|
||||
var config = driver.Config{
|
||||
Name: "OpenListShare",
|
||||
LocalSort: true,
|
||||
NoUpload: true,
|
||||
DefaultRoot: "/",
|
||||
}
|
||||
|
||||
func init() {
|
||||
op.RegisterDriver(func() driver.Driver {
|
||||
return &OpenListShare{}
|
||||
})
|
||||
}
|
@ -1,111 +0,0 @@
|
||||
package openlist_share
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
)
|
||||
|
||||
type ListReq struct {
|
||||
model.PageReq
|
||||
Path string `json:"path" form:"path"`
|
||||
Password string `json:"password" form:"password"`
|
||||
Refresh bool `json:"refresh"`
|
||||
}
|
||||
|
||||
type ObjResp struct {
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
IsDir bool `json:"is_dir"`
|
||||
Modified time.Time `json:"modified"`
|
||||
Created time.Time `json:"created"`
|
||||
Sign string `json:"sign"`
|
||||
Thumb string `json:"thumb"`
|
||||
Type int `json:"type"`
|
||||
HashInfo string `json:"hashinfo"`
|
||||
}
|
||||
|
||||
type FsListResp struct {
|
||||
Content []ObjResp `json:"content"`
|
||||
Total int64 `json:"total"`
|
||||
Readme string `json:"readme"`
|
||||
Write bool `json:"write"`
|
||||
Provider string `json:"provider"`
|
||||
}
|
||||
|
||||
type ArchiveMetaReq struct {
|
||||
ArchivePass string `json:"archive_pass"`
|
||||
Password string `json:"password"`
|
||||
Path string `json:"path"`
|
||||
Refresh bool `json:"refresh"`
|
||||
}
|
||||
|
||||
type TreeResp struct {
|
||||
ObjResp
|
||||
Children []TreeResp `json:"children"`
|
||||
hashCache *utils.HashInfo
|
||||
}
|
||||
|
||||
func (t *TreeResp) GetSize() int64 {
|
||||
return t.Size
|
||||
}
|
||||
|
||||
func (t *TreeResp) GetName() string {
|
||||
return t.Name
|
||||
}
|
||||
|
||||
func (t *TreeResp) ModTime() time.Time {
|
||||
return t.Modified
|
||||
}
|
||||
|
||||
func (t *TreeResp) CreateTime() time.Time {
|
||||
return t.Created
|
||||
}
|
||||
|
||||
func (t *TreeResp) IsDir() bool {
|
||||
return t.ObjResp.IsDir
|
||||
}
|
||||
|
||||
func (t *TreeResp) GetHash() utils.HashInfo {
|
||||
return utils.FromString(t.HashInfo)
|
||||
}
|
||||
|
||||
func (t *TreeResp) GetID() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (t *TreeResp) GetPath() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (t *TreeResp) GetChildren() []model.ObjTree {
|
||||
ret := make([]model.ObjTree, 0, len(t.Children))
|
||||
for _, child := range t.Children {
|
||||
ret = append(ret, &child)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (t *TreeResp) Thumb() string {
|
||||
return t.ObjResp.Thumb
|
||||
}
|
||||
|
||||
type ArchiveMetaResp struct {
|
||||
Comment string `json:"comment"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Content []TreeResp `json:"content"`
|
||||
RawURL string `json:"raw_url"`
|
||||
Sign string `json:"sign"`
|
||||
}
|
||||
|
||||
type ArchiveListReq struct {
|
||||
model.PageReq
|
||||
ArchiveMetaReq
|
||||
InnerPath string `json:"inner_path"`
|
||||
}
|
||||
|
||||
type ArchiveListResp struct {
|
||||
Content []ObjResp `json:"content"`
|
||||
Total int64 `json:"total"`
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
package openlist_share
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
)
|
||||
|
||||
func (d *OpenListShare) request(api, method string, callback base.ReqCallback) ([]byte, int, error) {
|
||||
url := d.Address + "/api" + api
|
||||
req := base.RestyClient.R()
|
||||
if callback != nil {
|
||||
callback(req)
|
||||
}
|
||||
res, err := req.Execute(method, url)
|
||||
if err != nil {
|
||||
code := 0
|
||||
if res != nil {
|
||||
code = res.StatusCode()
|
||||
}
|
||||
return nil, code, err
|
||||
}
|
||||
if res.StatusCode() >= 400 {
|
||||
return nil, res.StatusCode(), fmt.Errorf("request failed, status: %s", res.Status())
|
||||
}
|
||||
code := utils.Json.Get(res.Body(), "code").ToInt()
|
||||
if code != 200 {
|
||||
return nil, code, fmt.Errorf("request failed, code: %d, message: %s", code, utils.Json.Get(res.Body(), "message").ToString())
|
||||
}
|
||||
return res.Body(), 200, nil
|
||||
}
|
@ -12,7 +12,6 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
streamPkg "github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
hash_extend "github.com/OpenListTeam/OpenList/v4/pkg/utils/hash"
|
||||
"github.com/go-resty/resty/v2"
|
||||
@ -213,11 +212,15 @@ func (d *PikPak) Remove(ctx context.Context, obj model.Obj) error {
|
||||
}
|
||||
|
||||
func (d *PikPak) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
|
||||
sha1Str := stream.GetHash().GetHash(hash_extend.GCID)
|
||||
|
||||
hi := stream.GetHash()
|
||||
sha1Str := hi.GetHash(hash_extend.GCID)
|
||||
if len(sha1Str) < hash_extend.GCID.Width {
|
||||
var err error
|
||||
_, sha1Str, err = streamPkg.CacheFullAndHash(stream, &up, hash_extend.GCID, stream.GetSize())
|
||||
tFile, err := stream.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sha1Str, err = utils.HashFile(hash_extend.GCID, tFile, stream.GetSize())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -438,19 +438,20 @@ func (d *PikPak) UploadByOSS(ctx context.Context, params *S3Params, s model.File
|
||||
}
|
||||
|
||||
func (d *PikPak) UploadByMultipart(ctx context.Context, params *S3Params, fileSize int64, s model.FileStreamer, up driver.UpdateProgress) error {
|
||||
tmpF, err := s.CacheFullAndWriter(&up, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
chunks []oss.FileChunk
|
||||
parts []oss.UploadPart
|
||||
imur oss.InitiateMultipartUploadResult
|
||||
ossClient *oss.Client
|
||||
bucket *oss.Bucket
|
||||
err error
|
||||
)
|
||||
|
||||
tmpF, err := s.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ossClient, err = oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
streamPkg "github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/go-resty/resty/v2"
|
||||
)
|
||||
@ -157,7 +158,9 @@ func (d *QuarkOpen) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||
}
|
||||
|
||||
if len(writers) > 0 {
|
||||
_, err := stream.CacheFullAndWriter(&up, io.MultiWriter(writers...))
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, err := streamPkg.CacheFullInTempFileAndWriter(stream, cacheFileProgress, io.MultiWriter(writers...))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
streamPkg "github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/go-resty/resty/v2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -143,7 +144,9 @@ func (d *QuarkOrUC) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||
}
|
||||
|
||||
if len(writers) > 0 {
|
||||
_, err := stream.CacheFullAndWriter(&up, io.MultiWriter(writers...))
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, err := streamPkg.CacheFullInTempFileAndWriter(stream, cacheFileProgress, io.MultiWriter(writers...))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -149,18 +149,12 @@ func (d *QuarkOrUC) getTranscodingLink(file model.Obj) (*model.Link, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, info := range resp.Data.VideoList {
|
||||
if info.VideoInfo.URL != "" {
|
||||
return &model.Link{
|
||||
URL: info.VideoInfo.URL,
|
||||
ContentLength: info.VideoInfo.Size,
|
||||
Concurrency: 3,
|
||||
PartSize: 10 * utils.MB,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no link found")
|
||||
return &model.Link{
|
||||
URL: resp.Data.VideoList[0].VideoInfo.URL,
|
||||
ContentLength: resp.Data.VideoList[0].VideoInfo.Size,
|
||||
Concurrency: 3,
|
||||
PartSize: 10 * utils.MB,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *QuarkOrUC) upPre(file model.FileStreamer, parentId string) (UpPreResp, error) {
|
||||
|
@ -228,18 +228,12 @@ func (d *QuarkUCTV) getTranscodingLink(ctx context.Context, file model.Obj) (*mo
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, info := range fileLink.Data.VideoInfo {
|
||||
if info.URL != "" {
|
||||
return &model.Link{
|
||||
URL: info.URL,
|
||||
ContentLength: info.Size,
|
||||
Concurrency: 3,
|
||||
PartSize: 10 * utils.MB,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no link found")
|
||||
return &model.Link{
|
||||
URL: fileLink.Data.VideoInfo[0].URL,
|
||||
Concurrency: 3,
|
||||
PartSize: 10 * utils.MB,
|
||||
ContentLength: fileLink.Data.VideoInfo[0].Size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *QuarkUCTV) getDownloadLink(ctx context.Context, file model.Obj) (*model.Link, error) {
|
||||
|
@ -173,9 +173,8 @@ func (d *Strm) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*
|
||||
}, nil
|
||||
}
|
||||
|
||||
resultLink := *link
|
||||
resultLink.SyncClosers = utils.NewSyncClosers(link)
|
||||
return &resultLink, nil
|
||||
// 没有修改link的字段,可直接返回
|
||||
return link, nil
|
||||
}
|
||||
|
||||
var _ driver.Driver = (*Strm)(nil)
|
||||
|
@ -1,137 +0,0 @@
|
||||
package teldrive
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
func NewCopyManager(ctx context.Context, concurrent int, d *Teldrive) *CopyManager {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
return &CopyManager{
|
||||
TaskChan: make(chan CopyTask, concurrent*2),
|
||||
Sem: semaphore.NewWeighted(int64(concurrent)),
|
||||
G: g,
|
||||
Ctx: ctx,
|
||||
d: d,
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *CopyManager) startWorkers() {
|
||||
workerCount := cap(cm.TaskChan) / 2
|
||||
for i := 0; i < workerCount; i++ {
|
||||
cm.G.Go(func() error {
|
||||
return cm.worker()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *CopyManager) worker() error {
|
||||
for {
|
||||
select {
|
||||
case task, ok := <-cm.TaskChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := cm.Sem.Acquire(cm.Ctx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
err = cm.processFile(task)
|
||||
|
||||
cm.Sem.Release(1)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("task processing failed: %w", err)
|
||||
}
|
||||
|
||||
case <-cm.Ctx.Done():
|
||||
return cm.Ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *CopyManager) generateTasks(ctx context.Context, srcObj, dstDir model.Obj) error {
|
||||
if srcObj.IsDir() {
|
||||
return cm.generateFolderTasks(ctx, srcObj, dstDir)
|
||||
} else {
|
||||
// add single file task directly
|
||||
select {
|
||||
case cm.TaskChan <- CopyTask{SrcObj: srcObj, DstDir: dstDir}:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *CopyManager) generateFolderTasks(ctx context.Context, srcDir, dstDir model.Obj) error {
|
||||
objs, err := cm.d.List(ctx, srcDir, model.ListArgs{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list directory %s: %w", srcDir.GetPath(), err)
|
||||
}
|
||||
|
||||
err = cm.d.MakeDir(cm.Ctx, dstDir, srcDir.GetName())
|
||||
if err != nil || len(objs) == 0 {
|
||||
return err
|
||||
}
|
||||
newDstDir := &model.Object{
|
||||
ID: dstDir.GetID(),
|
||||
Path: dstDir.GetPath() + "/" + srcDir.GetName(),
|
||||
Name: srcDir.GetName(),
|
||||
IsFolder: true,
|
||||
}
|
||||
|
||||
for _, file := range objs {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
srcFile := &model.Object{
|
||||
ID: file.GetID(),
|
||||
Path: srcDir.GetPath() + "/" + file.GetName(),
|
||||
Name: file.GetName(),
|
||||
IsFolder: file.IsDir(),
|
||||
}
|
||||
|
||||
// 递归生成任务
|
||||
if err := cm.generateTasks(ctx, srcFile, newDstDir); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *CopyManager) processFile(task CopyTask) error {
|
||||
return cm.copySingleFile(cm.Ctx, task.SrcObj, task.DstDir)
|
||||
}
|
||||
|
||||
func (cm *CopyManager) copySingleFile(ctx context.Context, srcObj, dstDir model.Obj) error {
|
||||
// `override copy mode` should delete the existing file
|
||||
if obj, err := cm.d.getFile(dstDir.GetPath(), srcObj.GetName(), srcObj.IsDir()); err == nil {
|
||||
if err := cm.d.Remove(ctx, obj); err != nil {
|
||||
return fmt.Errorf("failed to remove existing file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Do copy
|
||||
return cm.d.request(http.MethodPost, "/api/files/{id}/copy", func(req *resty.Request) {
|
||||
req.SetPathParam("id", srcObj.GetID())
|
||||
req.SetBody(base.Json{
|
||||
"newName": srcObj.GetName(),
|
||||
"destination": dstDir.GetPath(),
|
||||
})
|
||||
}, nil)
|
||||
}
|
@ -1,217 +0,0 @@
|
||||
package teldrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type Teldrive struct {
|
||||
model.Storage
|
||||
Addition
|
||||
}
|
||||
|
||||
func (d *Teldrive) Config() driver.Config {
|
||||
return config
|
||||
}
|
||||
|
||||
func (d *Teldrive) GetAddition() driver.Additional {
|
||||
return &d.Addition
|
||||
}
|
||||
|
||||
func (d *Teldrive) Init(ctx context.Context) error {
|
||||
d.Address = strings.TrimSuffix(d.Address, "/")
|
||||
if d.Cookie == "" || !strings.HasPrefix(d.Cookie, "access_token=") {
|
||||
return fmt.Errorf("cookie must start with 'access_token='")
|
||||
}
|
||||
if d.UploadConcurrency == 0 {
|
||||
d.UploadConcurrency = 4
|
||||
}
|
||||
if d.ChunkSize == 0 {
|
||||
d.ChunkSize = 10
|
||||
}
|
||||
|
||||
op.MustSaveDriverStorage(d)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) Drop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
|
||||
var listResp ListResp
|
||||
err := d.request(http.MethodGet, "/api/files", func(req *resty.Request) {
|
||||
req.SetQueryParams(map[string]string{
|
||||
"path": dir.GetPath(),
|
||||
"limit": "1000", // overide default 500, TODO pagination
|
||||
})
|
||||
}, &listResp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return utils.SliceConvert(listResp.Items, func(src Object) (model.Obj, error) {
|
||||
return &model.Object{
|
||||
ID: src.ID,
|
||||
Name: src.Name,
|
||||
Size: func() int64 {
|
||||
if src.Type == "folder" {
|
||||
return 0
|
||||
}
|
||||
return src.Size
|
||||
}(),
|
||||
IsFolder: src.Type == "folder",
|
||||
Modified: src.UpdatedAt,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Teldrive) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
|
||||
if d.UseShareLink {
|
||||
shareObj, err := d.getShareFileById(file.GetID())
|
||||
if err != nil || shareObj == nil {
|
||||
if err := d.createShareFile(file.GetID()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
shareObj, err = d.getShareFileById(file.GetID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &model.Link{
|
||||
URL: d.Address + "/api/shares/" + url.PathEscape(shareObj.Id) + "/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()),
|
||||
}, nil
|
||||
}
|
||||
return &model.Link{
|
||||
URL: d.Address + "/api/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()),
|
||||
Header: http.Header{
|
||||
"Cookie": {d.Cookie},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error {
|
||||
return d.request(http.MethodPost, "/api/files/mkdir", func(req *resty.Request) {
|
||||
req.SetBody(map[string]interface{}{
|
||||
"path": parentDir.GetPath() + "/" + dirName,
|
||||
})
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (d *Teldrive) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
|
||||
body := base.Json{
|
||||
"ids": []string{srcObj.GetID()},
|
||||
"destinationParent": dstDir.GetID(),
|
||||
}
|
||||
return d.request(http.MethodPost, "/api/files/move", func(req *resty.Request) {
|
||||
req.SetBody(body)
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (d *Teldrive) Rename(ctx context.Context, srcObj model.Obj, newName string) error {
|
||||
body := base.Json{
|
||||
"name": newName,
|
||||
}
|
||||
return d.request(http.MethodPatch, "/api/files/{id}", func(req *resty.Request) {
|
||||
req.SetPathParam("id", srcObj.GetID())
|
||||
req.SetBody(body)
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (d *Teldrive) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
|
||||
copyConcurrentLimit := 4
|
||||
copyManager := NewCopyManager(ctx, copyConcurrentLimit, d)
|
||||
copyManager.startWorkers()
|
||||
copyManager.G.Go(func() error {
|
||||
defer close(copyManager.TaskChan)
|
||||
return copyManager.generateTasks(ctx, srcObj, dstDir)
|
||||
})
|
||||
return copyManager.G.Wait()
|
||||
}
|
||||
|
||||
func (d *Teldrive) Remove(ctx context.Context, obj model.Obj) error {
|
||||
body := base.Json{
|
||||
"ids": []string{obj.GetID()},
|
||||
}
|
||||
return d.request(http.MethodPost, "/api/files/delete", func(req *resty.Request) {
|
||||
req.SetBody(body)
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (d *Teldrive) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
|
||||
fileId := uuid.New().String()
|
||||
chunkSizeInMB := d.ChunkSize
|
||||
chunkSize := chunkSizeInMB * 1024 * 1024 // Convert MB to bytes
|
||||
totalSize := file.GetSize()
|
||||
totalParts := int(math.Ceil(float64(totalSize) / float64(chunkSize)))
|
||||
maxRetried := 3
|
||||
|
||||
// delete the upload task when finished or failed
|
||||
defer func() {
|
||||
_ = d.request(http.MethodDelete, "/api/uploads/{id}", func(req *resty.Request) {
|
||||
req.SetPathParam("id", fileId)
|
||||
}, nil)
|
||||
}()
|
||||
|
||||
if obj, err := d.getFile(dstDir.GetPath(), file.GetName(), file.IsDir()); err == nil {
|
||||
if err = d.Remove(ctx, obj); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// start the upload process
|
||||
if err := d.request(http.MethodGet, "/api/uploads/fileId", func(req *resty.Request) {
|
||||
req.SetPathParam("id", fileId)
|
||||
}, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if totalSize == 0 {
|
||||
return d.touch(file.GetName(), dstDir.GetPath())
|
||||
}
|
||||
|
||||
if totalParts <= 1 {
|
||||
return d.doSingleUpload(ctx, dstDir, file, up, totalParts, chunkSize, fileId)
|
||||
}
|
||||
|
||||
return d.doMultiUpload(ctx, dstDir, file, up, maxRetried, totalParts, chunkSize, fileId)
|
||||
}
|
||||
|
||||
func (d *Teldrive) GetArchiveMeta(ctx context.Context, obj model.Obj, args model.ArchiveArgs) (model.ArchiveMeta, error) {
|
||||
// TODO get archive file meta-info, return errs.NotImplement to use an internal archive tool, optional
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
func (d *Teldrive) ListArchive(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) ([]model.Obj, error) {
|
||||
// TODO list args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
func (d *Teldrive) Extract(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) (*model.Link, error) {
|
||||
// TODO return link of file args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
func (d *Teldrive) ArchiveDecompress(ctx context.Context, srcObj, dstDir model.Obj, args model.ArchiveDecompressArgs) ([]model.Obj, error) {
|
||||
// TODO extract args.InnerPath path in the archive srcObj to the dstDir location, optional
|
||||
// a folder with the same name as the archive file needs to be created to store the extracted results if args.PutIntoNewDir
|
||||
// return errs.NotImplement to use an internal archive tool
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
//func (d *Teldrive) Other(ctx context.Context, args model.OtherArgs) (interface{}, error) {
|
||||
// return nil, errs.NotSupport
|
||||
//}
|
||||
|
||||
var _ driver.Driver = (*Teldrive)(nil)
|
@ -1,26 +0,0 @@
|
||||
package teldrive
|
||||
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
)
|
||||
|
||||
type Addition struct {
|
||||
driver.RootPath
|
||||
Address string `json:"url" required:"true"`
|
||||
Cookie string `json:"cookie" type:"string" required:"true" help:"access_token=xxx"`
|
||||
UseShareLink bool `json:"use_share_link" type:"bool" default:"false" help:"Create share link when getting link to support 302. If disabled, you need to enable web proxy."`
|
||||
ChunkSize int64 `json:"chunk_size" type:"number" default:"10" help:"Chunk size in MiB"`
|
||||
UploadConcurrency int64 `json:"upload_concurrency" type:"number" default:"4" help:"Concurrency upload requests"`
|
||||
}
|
||||
|
||||
var config = driver.Config{
|
||||
Name: "Teldrive",
|
||||
DefaultRoot: "/",
|
||||
}
|
||||
|
||||
func init() {
|
||||
op.RegisterDriver(func() driver.Driver {
|
||||
return &Teldrive{}
|
||||
})
|
||||
}
|
@ -1,77 +0,0 @@
|
||||
package teldrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
type ErrResp struct {
|
||||
Code int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type Object struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
MimeType string `json:"mimeType"`
|
||||
Category string `json:"category,omitempty"`
|
||||
ParentId string `json:"parentId"`
|
||||
Size int64 `json:"size"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
}
|
||||
|
||||
type ListResp struct {
|
||||
Items []Object `json:"items"`
|
||||
Meta struct {
|
||||
Count int `json:"count"`
|
||||
TotalPages int `json:"totalPages"`
|
||||
CurrentPage int `json:"currentPage"`
|
||||
} `json:"meta"`
|
||||
}
|
||||
|
||||
type FilePart struct {
|
||||
Name string `json:"name"`
|
||||
PartId int `json:"partId"`
|
||||
PartNo int `json:"partNo"`
|
||||
ChannelId int `json:"channelId"`
|
||||
Size int `json:"size"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Salt string `json:"salt"`
|
||||
}
|
||||
|
||||
type chunkTask struct {
|
||||
chunkIdx int
|
||||
fileName string
|
||||
chunkSize int64
|
||||
reader *stream.SectionReader
|
||||
ss *stream.StreamSectionReader
|
||||
}
|
||||
|
||||
type CopyManager struct {
|
||||
TaskChan chan CopyTask
|
||||
Sem *semaphore.Weighted
|
||||
G *errgroup.Group
|
||||
Ctx context.Context
|
||||
d *Teldrive
|
||||
}
|
||||
|
||||
type CopyTask struct {
|
||||
SrcObj model.Obj
|
||||
DstDir model.Obj
|
||||
}
|
||||
|
||||
type ShareObj struct {
|
||||
Id string `json:"id"`
|
||||
Protected bool `json:"protected"`
|
||||
UserId int `json:"userId"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
ExpiresAt time.Time `json:"expiresAt"`
|
||||
}
|
@ -1,373 +0,0 @@
|
||||
package teldrive
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/avast/retry-go"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
// create empty file
|
||||
func (d *Teldrive) touch(name, path string) error {
|
||||
uploadBody := base.Json{
|
||||
"name": name,
|
||||
"type": "file",
|
||||
"path": path,
|
||||
}
|
||||
if err := d.request(http.MethodPost, "/api/files", func(req *resty.Request) {
|
||||
req.SetBody(uploadBody)
|
||||
}, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) createFileOnUploadSuccess(name, id, path string, uploadedFileParts []FilePart, totalSize int64) error {
|
||||
remoteFileParts, err := d.getFilePart(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// check if the uploaded file parts match the remote file parts
|
||||
if len(remoteFileParts) != len(uploadedFileParts) {
|
||||
return fmt.Errorf("[Teldrive] file parts count mismatch: expected %d, got %d", len(uploadedFileParts), len(remoteFileParts))
|
||||
}
|
||||
formatParts := make([]base.Json, 0)
|
||||
for _, p := range remoteFileParts {
|
||||
formatParts = append(formatParts, base.Json{
|
||||
"id": p.PartId,
|
||||
"salt": p.Salt,
|
||||
})
|
||||
}
|
||||
uploadBody := base.Json{
|
||||
"name": name,
|
||||
"type": "file",
|
||||
"path": path,
|
||||
"parts": formatParts,
|
||||
"size": totalSize,
|
||||
}
|
||||
// create file here
|
||||
if err := d.request(http.MethodPost, "/api/files", func(req *resty.Request) {
|
||||
req.SetBody(uploadBody)
|
||||
}, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) checkFilePartExist(fileId string, partId int) (FilePart, error) {
|
||||
var uploadedParts []FilePart
|
||||
var filePart FilePart
|
||||
|
||||
if err := d.request(http.MethodGet, "/api/uploads/{id}", func(req *resty.Request) {
|
||||
req.SetPathParam("id", fileId)
|
||||
}, &uploadedParts); err != nil {
|
||||
return filePart, err
|
||||
}
|
||||
|
||||
for _, part := range uploadedParts {
|
||||
if part.PartId == partId {
|
||||
return part, nil
|
||||
}
|
||||
}
|
||||
|
||||
return filePart, nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) getFilePart(fileId string) ([]FilePart, error) {
|
||||
var uploadedParts []FilePart
|
||||
if err := d.request(http.MethodGet, "/api/uploads/{id}", func(req *resty.Request) {
|
||||
req.SetPathParam("id", fileId)
|
||||
}, &uploadedParts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return uploadedParts, nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) singleUploadRequest(fileId string, callback base.ReqCallback, resp interface{}) error {
|
||||
url := d.Address + "/api/uploads/" + fileId
|
||||
client := resty.New().SetTimeout(0)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
req := client.R().
|
||||
SetContext(ctx)
|
||||
req.SetHeader("Cookie", d.Cookie)
|
||||
req.SetHeader("Content-Type", "application/octet-stream")
|
||||
req.SetContentLength(true)
|
||||
req.AddRetryCondition(func(r *resty.Response, err error) bool {
|
||||
return false
|
||||
})
|
||||
if callback != nil {
|
||||
callback(req)
|
||||
}
|
||||
if resp != nil {
|
||||
req.SetResult(resp)
|
||||
}
|
||||
var e ErrResp
|
||||
req.SetError(&e)
|
||||
_req, err := req.Execute(http.MethodPost, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _req.IsError() {
|
||||
return &e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) doSingleUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress,
|
||||
totalParts int, chunkSize int64, fileId string) error {
|
||||
|
||||
totalSize := file.GetSize()
|
||||
var fileParts []FilePart
|
||||
var uploaded int64 = 0
|
||||
ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for uploaded < totalSize {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
curChunkSize := min(totalSize-uploaded, chunkSize)
|
||||
rd, err := ss.GetSectionReader(uploaded, curChunkSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filePart := &FilePart{}
|
||||
if err := retry.Do(func() error {
|
||||
|
||||
if _, err := rd.Seek(0, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := d.singleUploadRequest(fileId, func(req *resty.Request) {
|
||||
uploadParams := map[string]string{
|
||||
"partName": func() string {
|
||||
digits := len(fmt.Sprintf("%d", totalParts))
|
||||
return file.GetName() + fmt.Sprintf(".%0*d", digits, 1)
|
||||
}(),
|
||||
"partNo": strconv.Itoa(1),
|
||||
"fileName": file.GetName(),
|
||||
}
|
||||
req.SetQueryParams(uploadParams)
|
||||
req.SetBody(driver.NewLimitedUploadStream(ctx, rd))
|
||||
req.SetHeader("Content-Length", strconv.FormatInt(curChunkSize, 10))
|
||||
}, filePart); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if filePart.Name != "" {
|
||||
fileParts = append(fileParts, *filePart)
|
||||
uploaded += curChunkSize
|
||||
up(float64(uploaded) / float64(totalSize))
|
||||
ss.FreeSectionReader(rd)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize)
|
||||
}
|
||||
|
||||
func (d *Teldrive) doMultiUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress,
|
||||
maxRetried, totalParts int, chunkSize int64, fileId string) error {
|
||||
|
||||
concurrent := d.UploadConcurrency
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
sem := semaphore.NewWeighted(int64(concurrent))
|
||||
chunkChan := make(chan chunkTask, concurrent*2)
|
||||
resultChan := make(chan FilePart, concurrent)
|
||||
totalSize := file.GetSize()
|
||||
|
||||
ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ssLock := sync.Mutex{}
|
||||
g.Go(func() error {
|
||||
defer close(chunkChan)
|
||||
|
||||
chunkIdx := 0
|
||||
for chunkIdx < totalParts {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
offset := int64(chunkIdx) * chunkSize
|
||||
curChunkSize := min(totalSize-offset, chunkSize)
|
||||
|
||||
ssLock.Lock()
|
||||
reader, err := ss.GetSectionReader(offset, curChunkSize)
|
||||
ssLock.Unlock()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
task := chunkTask{
|
||||
chunkIdx: chunkIdx + 1,
|
||||
chunkSize: curChunkSize,
|
||||
fileName: file.GetName(),
|
||||
reader: reader,
|
||||
ss: ss,
|
||||
}
|
||||
// freeSectionReader will be called in d.uploadSingleChunk
|
||||
select {
|
||||
case chunkChan <- task:
|
||||
chunkIdx++
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
for i := 0; i < int(concurrent); i++ {
|
||||
g.Go(func() error {
|
||||
for task := range chunkChan {
|
||||
if err := sem.Acquire(ctx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
filePart, err := d.uploadSingleChunk(ctx, fileId, task, totalParts, maxRetried)
|
||||
sem.Release(1)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("upload chunk %d failed: %w", task.chunkIdx, err)
|
||||
}
|
||||
|
||||
select {
|
||||
case resultChan <- *filePart:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
var fileParts []FilePart
|
||||
var collectErr error
|
||||
collectDone := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(collectDone)
|
||||
fileParts = make([]FilePart, 0, totalParts)
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- g.Wait()
|
||||
close(resultChan)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case filePart, ok := <-resultChan:
|
||||
if !ok {
|
||||
collectErr = <-done
|
||||
return
|
||||
}
|
||||
fileParts = append(fileParts, filePart)
|
||||
case err := <-done:
|
||||
collectErr = err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-collectDone
|
||||
|
||||
if collectErr != nil {
|
||||
return fmt.Errorf("multi-upload failed: %w", collectErr)
|
||||
}
|
||||
sort.Slice(fileParts, func(i, j int) bool {
|
||||
return fileParts[i].PartNo < fileParts[j].PartNo
|
||||
})
|
||||
|
||||
return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize)
|
||||
}
|
||||
|
||||
func (d *Teldrive) uploadSingleChunk(ctx context.Context, fileId string, task chunkTask, totalParts, maxRetried int) (*FilePart, error) {
|
||||
filePart := &FilePart{}
|
||||
retryCount := 0
|
||||
defer task.ss.FreeSectionReader(task.reader)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if existingPart, err := d.checkFilePartExist(fileId, task.chunkIdx); err == nil && existingPart.Name != "" {
|
||||
return &existingPart, nil
|
||||
}
|
||||
|
||||
err := d.singleUploadRequest(fileId, func(req *resty.Request) {
|
||||
uploadParams := map[string]string{
|
||||
"partName": func() string {
|
||||
digits := len(fmt.Sprintf("%d", totalParts))
|
||||
return task.fileName + fmt.Sprintf(".%0*d", digits, task.chunkIdx)
|
||||
}(),
|
||||
"partNo": strconv.Itoa(task.chunkIdx),
|
||||
"fileName": task.fileName,
|
||||
}
|
||||
req.SetQueryParams(uploadParams)
|
||||
req.SetBody(driver.NewLimitedUploadStream(ctx, task.reader))
|
||||
req.SetHeader("Content-Length", strconv.Itoa(int(task.chunkSize)))
|
||||
}, filePart)
|
||||
|
||||
if err == nil {
|
||||
return filePart, nil
|
||||
}
|
||||
|
||||
if retryCount >= maxRetried {
|
||||
return nil, fmt.Errorf("upload failed after %d retries: %w", maxRetried, err)
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||
continue
|
||||
}
|
||||
|
||||
retryCount++
|
||||
utils.Log.Errorf("[Teldrive] upload error: %v, retrying %d times", err, retryCount)
|
||||
|
||||
backoffDuration := time.Duration(retryCount*retryCount) * time.Second
|
||||
if backoffDuration > 30*time.Second {
|
||||
backoffDuration = 30 * time.Second
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(backoffDuration):
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
@ -1,109 +0,0 @@
|
||||
package teldrive
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/drivers/base"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/go-resty/resty/v2"
|
||||
)
|
||||
|
||||
// do others that not defined in Driver interface
|
||||
|
||||
func (d *Teldrive) request(method string, pathname string, callback base.ReqCallback, resp interface{}) error {
|
||||
url := d.Address + pathname
|
||||
req := base.RestyClient.R()
|
||||
req.SetHeader("Cookie", d.Cookie)
|
||||
if callback != nil {
|
||||
callback(req)
|
||||
}
|
||||
if resp != nil {
|
||||
req.SetResult(resp)
|
||||
}
|
||||
var e ErrResp
|
||||
req.SetError(&e)
|
||||
_req, err := req.Execute(method, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _req.IsError() {
|
||||
return &e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) getFile(path, name string, isFolder bool) (model.Obj, error) {
|
||||
resp := &ListResp{}
|
||||
err := d.request(http.MethodGet, "/api/files", func(req *resty.Request) {
|
||||
req.SetQueryParams(map[string]string{
|
||||
"path": path,
|
||||
"name": name,
|
||||
"type": func() string {
|
||||
if isFolder {
|
||||
return "folder"
|
||||
}
|
||||
return "file"
|
||||
}(),
|
||||
"operation": "find",
|
||||
})
|
||||
}, resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(resp.Items) == 0 {
|
||||
return nil, fmt.Errorf("file not found: %s/%s", path, name)
|
||||
}
|
||||
obj := resp.Items[0]
|
||||
return &model.Object{
|
||||
ID: obj.ID,
|
||||
Name: obj.Name,
|
||||
Size: obj.Size,
|
||||
IsFolder: obj.Type == "folder",
|
||||
}, err
|
||||
}
|
||||
|
||||
func (err *ErrResp) Error() string {
|
||||
if err == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return fmt.Sprintf("[Teldrive] message:%s Error code:%d", err.Message, err.Code)
|
||||
}
|
||||
|
||||
func (d *Teldrive) createShareFile(fileId string) error {
|
||||
var errResp ErrResp
|
||||
if err := d.request(http.MethodPost, "/api/files/{id}/share", func(req *resty.Request) {
|
||||
req.SetPathParam("id", fileId)
|
||||
req.SetBody(base.Json{
|
||||
"expiresAt": getDateTime(),
|
||||
})
|
||||
}, &errResp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if errResp.Message != "" {
|
||||
return &errResp
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Teldrive) getShareFileById(fileId string) (*ShareObj, error) {
|
||||
var shareObj ShareObj
|
||||
if err := d.request(http.MethodGet, "/api/files/{id}/share", func(req *resty.Request) {
|
||||
req.SetPathParam("id", fileId)
|
||||
}, &shareObj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &shareObj, nil
|
||||
}
|
||||
|
||||
func getDateTime() string {
|
||||
now := time.Now().UTC()
|
||||
formattedWithMs := now.Add(time.Hour * 1).Format("2006-01-02T15:04:05.000Z")
|
||||
return formattedWithMs
|
||||
}
|
@ -132,7 +132,7 @@ func (d *Terabox) Remove(ctx context.Context, obj model.Obj) error {
|
||||
func (d *Terabox) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
|
||||
resp, err := base.RestyClient.R().
|
||||
SetContext(ctx).
|
||||
Get("https://" + d.url_domain_prefix + "-data.terabox.com/rest/2.0/pcs/file?method=locateupload")
|
||||
Get("https://d.terabox.com/rest/2.0/pcs/file?method=locateupload")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -179,7 +179,7 @@ func (d *Terabox) Put(ctx context.Context, dstDir model.Obj, stream model.FileSt
|
||||
}
|
||||
|
||||
// upload chunks
|
||||
tempFile, err := stream.CacheFullAndWriter(&up, nil)
|
||||
tempFile, err := stream.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -371,7 +371,9 @@ func (xc *XunLeiCommon) Put(ctx context.Context, dstDir model.Obj, file model.Fi
|
||||
gcid := file.GetHash().GetHash(hash_extend.GCID)
|
||||
var err error
|
||||
if len(gcid) < hash_extend.GCID.Width {
|
||||
_, gcid, err = stream.CacheFullAndHash(file, &up, hash_extend.GCID, file.GetSize())
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, gcid, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, hash_extend.GCID, file.GetSize())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -491,7 +491,9 @@ func (xc *XunLeiBrowserCommon) Put(ctx context.Context, dstDir model.Obj, stream
|
||||
gcid := stream.GetHash().GetHash(hash_extend.GCID)
|
||||
var err error
|
||||
if len(gcid) < hash_extend.GCID.Width {
|
||||
_, gcid, err = streamPkg.CacheFullAndHash(stream, &up, hash_extend.GCID, stream.GetSize())
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, gcid, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, hash_extend.GCID, stream.GetSize())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -372,7 +372,9 @@ func (xc *XunLeiXCommon) Put(ctx context.Context, dstDir model.Obj, file model.F
|
||||
gcid := file.GetHash().GetHash(hash_extend.GCID)
|
||||
var err error
|
||||
if len(gcid) < hash_extend.GCID.Width {
|
||||
_, gcid, err = stream.CacheFullAndHash(file, &up, hash_extend.GCID, file.GetSize())
|
||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||
_, gcid, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, hash_extend.GCID, file.GetSize())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ type Addition struct {
|
||||
// driver.RootPath
|
||||
// driver.RootID
|
||||
// define other
|
||||
UrlStructure string `json:"url_structure" type:"text" required:"true" default:"https://raw.githubusercontent.com/OpenListTeam/OpenList/main/README.md\nhttps://raw.githubusercontent.com/OpenListTeam/OpenList/main/README_cn.md\nfolder:\n CONTRIBUTING.md:1635:https://raw.githubusercontent.com/OpenListTeam/OpenList/main/CONTRIBUTING.md\n CODE_OF_CONDUCT.md:2093:https://raw.githubusercontent.com/OpenListTeam/OpenList/main/CODE_OF_CONDUCT.md" help:"structure:FolderName:\n [FileName:][FileSize:][Modified:]Url"`
|
||||
UrlStructure string `json:"url_structure" type:"text" required:"true" default:"https://cdn.oplist.org/gh/OpenListTeam/OpenList/README.md\nhttps://cdn.oplist.org/gh/OpenListTeam/OpenList/README_cn.md\nfolder:\n CONTRIBUTING.md:1635:https://cdn.oplist.org/gh/OpenListTeam/OpenList/CONTRIBUTING.md\n CODE_OF_CONDUCT.md:2093:https://cdn.oplist.org/gh/OpenListTeam/OpenList/CODE_OF_CONDUCT.md" help:"structure:FolderName:\n [FileName:][FileSize:][Modified:]Url"`
|
||||
HeadSize bool `json:"head_size" type:"bool" default:"false" help:"Use head method to get file size, but it may be failed."`
|
||||
Writable bool `json:"writable" type:"bool" default:"false"`
|
||||
}
|
||||
|
@ -317,7 +317,7 @@ func (d *WeiYun) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
|
||||
if folder, ok = dstDir.(*Folder); !ok {
|
||||
return nil, errs.NotSupport
|
||||
}
|
||||
file, err := stream.CacheFullAndWriter(&up, nil)
|
||||
file, err := stream.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -36,6 +36,5 @@ func (d *Wopan) getSpaceType() string {
|
||||
|
||||
// 20230607214351
|
||||
func getTime(str string) (time.Time, error) {
|
||||
loc := time.FixedZone("UTC+8", 8*60*60)
|
||||
return time.ParseInLocation("20060102150405", str, loc)
|
||||
return time.Parse("20060102150405", str)
|
||||
}
|
||||
|
@ -5,35 +5,10 @@ umask ${UMASK}
|
||||
if [ "$1" = "version" ]; then
|
||||
./openlist version
|
||||
else
|
||||
# Check file of /opt/openlist/data permissions for current user
|
||||
# 检查当前用户是否有当前目录的写和执行权限
|
||||
if [ -d ./data ]; then
|
||||
if ! [ -w ./data ] || ! [ -x ./data ]; then
|
||||
cat <<EOF
|
||||
Error: Current user does not have write and/or execute permissions for the ./data directory: $(pwd)/data
|
||||
Please visit https://doc.oplist.org/guide/installation/docker#for-version-after-v4-1-0 for more information.
|
||||
错误:当前用户没有 ./data 目录($(pwd)/data)的写和/或执行权限。
|
||||
请访问 https://doc.oplist.org/guide/installation/docker#v4-1-0-%E4%BB%A5%E5%90%8E%E7%89%88%E6%9C%AC 获取更多信息。
|
||||
Exiting...
|
||||
EOF
|
||||
exit 1
|
||||
fi
|
||||
if [ "$RUN_ARIA2" = "true" ]; then
|
||||
cp -a /opt/service/stop/aria2 /opt/service/start 2>/dev/null
|
||||
fi
|
||||
|
||||
# Define the target directory path for aria2 service
|
||||
ARIA2_DIR="/opt/service/start/aria2"
|
||||
if [ "$RUN_ARIA2" = "true" ]; then
|
||||
# If aria2 should run and target directory doesn't exist, copy it
|
||||
if [ ! -d "$ARIA2_DIR" ]; then
|
||||
mkdir -p "$ARIA2_DIR"
|
||||
cp -r /opt/service/stop/aria2/* "$ARIA2_DIR" 2>/dev/null
|
||||
fi
|
||||
runsvdir /opt/service/start &
|
||||
else
|
||||
# If aria2 should NOT run and target directory exists, remove it
|
||||
if [ -d "$ARIA2_DIR" ]; then
|
||||
rm -rf "$ARIA2_DIR"
|
||||
fi
|
||||
fi
|
||||
exec ./openlist server --no-prefix
|
||||
fi
|
||||
chown -R ${PUID}:${PGID} /opt
|
||||
exec su-exec ${PUID}:${PGID} runsvdir /opt/service/start
|
||||
fi
|
10
go.mod
10
go.mod
@ -11,7 +11,7 @@ require (
|
||||
github.com/OpenListTeam/times v0.1.0
|
||||
github.com/OpenListTeam/wopan-sdk-go v0.1.5
|
||||
github.com/ProtonMail/go-crypto v1.3.0
|
||||
github.com/SheltonZhu/115driver v1.1.1
|
||||
github.com/SheltonZhu/115driver v1.1.0
|
||||
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
|
||||
github.com/avast/retry-go v3.0.0+incompatible
|
||||
github.com/aws/aws-sdk-go v1.55.7
|
||||
@ -21,7 +21,7 @@ require (
|
||||
github.com/charmbracelet/bubbletea v1.3.6
|
||||
github.com/charmbracelet/lipgloss v1.1.0
|
||||
github.com/city404/v6-public-rpc-proto/go v0.0.0-20240817070657-90f8e24b653e
|
||||
github.com/coreos/go-oidc v2.3.0+incompatible
|
||||
github.com/coreos/go-oidc v2.4.0+incompatible
|
||||
github.com/deckarep/golang-set/v2 v2.8.0
|
||||
github.com/dhowden/tag v0.0.0-20240417053706-3d75831295e8
|
||||
github.com/disintegration/imaging v1.6.2
|
||||
@ -41,7 +41,7 @@ require (
|
||||
github.com/hirochachacha/go-smb2 v1.1.0
|
||||
github.com/ipfs/go-ipfs-api v0.7.0
|
||||
github.com/itsHenry35/gofakes3 v0.0.8
|
||||
github.com/jlaffaye/ftp v0.2.1-0.20250831012827-3f092e051c94
|
||||
github.com/jlaffaye/ftp v0.2.1-0.20240918233326-1b970516f5d3
|
||||
github.com/json-iterator/go v1.1.12
|
||||
github.com/kdomanski/iso9660 v0.4.0
|
||||
github.com/maruel/natural v1.1.1
|
||||
@ -58,7 +58,7 @@ require (
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/spf13/afero v1.14.0
|
||||
github.com/spf13/cobra v1.9.1
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5
|
||||
github.com/u2takey/ffmpeg-go v0.5.0
|
||||
github.com/upyun/go-sdk/v3 v3.0.4
|
||||
@ -254,7 +254,7 @@ require (
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
go.etcd.io/bbolt v1.4.0 // indirect
|
||||
golang.org/x/arch v0.18.0 // indirect
|
||||
golang.org/x/sync v0.16.0
|
||||
golang.org/x/sync v0.16.0 // indirect
|
||||
golang.org/x/sys v0.34.0 // indirect
|
||||
golang.org/x/term v0.33.0 // indirect
|
||||
golang.org/x/text v0.27.0
|
||||
|
10
go.sum
10
go.sum
@ -59,8 +59,8 @@ github.com/RoaringBitmap/roaring/v2 v2.4.5 h1:uGrrMreGjvAtTBobc0g5IrW1D5ldxDQYe2
|
||||
github.com/RoaringBitmap/roaring/v2 v2.4.5/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
|
||||
github.com/STARRY-S/zip v0.2.1 h1:pWBd4tuSGm3wtpoqRZZ2EAwOmcHK6XFf7bU9qcJXyFg=
|
||||
github.com/STARRY-S/zip v0.2.1/go.mod h1:xNvshLODWtC4EJ702g7cTYn13G53o1+X9BWnPFpcWV4=
|
||||
github.com/SheltonZhu/115driver v1.1.1 h1:9EMhe2ZJflGiAaZbYInw2jqxTcqZNF+DtVDsEy70aFU=
|
||||
github.com/SheltonZhu/115driver v1.1.1/go.mod h1:rKvNd4Y4OkXv1TMbr/SKjGdcvMQxh6AW5Tw9w0CJb7E=
|
||||
github.com/SheltonZhu/115driver v1.1.0 h1:kA8Vtu5JVWqqJFiTF06+HDb9zVEO6ZSdyjV5HsGx7Wg=
|
||||
github.com/SheltonZhu/115driver v1.1.0/go.mod h1:rKvNd4Y4OkXv1TMbr/SKjGdcvMQxh6AW5Tw9w0CJb7E=
|
||||
github.com/abbot/go-http-auth v0.4.0 h1:QjmvZ5gSC7jm3Zg54DqWE/T5m1t2AfDu6QlXJT0EVT0=
|
||||
github.com/abbot/go-http-auth v0.4.0/go.mod h1:Cz6ARTIzApMJDzh5bRMSUou6UMSp0IEXg9km/ci7TJM=
|
||||
github.com/aead/ecdh v0.2.0 h1:pYop54xVaq/CEREFEcukHRZfTdjiWvYIsZDXXrBapQQ=
|
||||
@ -205,6 +205,8 @@ github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJ
|
||||
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
|
||||
github.com/coreos/go-oidc v2.3.0+incompatible h1:+5vEsrgprdLjjQ9FzIKAzQz1wwPD+83hQRfUIPh7rO0=
|
||||
github.com/coreos/go-oidc v2.3.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
|
||||
github.com/coreos/go-oidc v2.4.0+incompatible h1:xjdlhLWXcINyUJgLQ9I76g7osgC2goiL6JDXS6Fegjk=
|
||||
github.com/coreos/go-oidc v2.4.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
|
||||
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
|
||||
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
|
||||
@ -402,8 +404,6 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/jlaffaye/ftp v0.2.1-0.20240918233326-1b970516f5d3 h1:ZxO6Qr2GOXPdcW80Mcn3nemvilMPvpWqxrNfK2ZnNNs=
|
||||
github.com/jlaffaye/ftp v0.2.1-0.20240918233326-1b970516f5d3/go.mod h1:dvLUr/8Fs9a2OBrEnCC5duphbkz/k/mSy5OkXg3PAgI=
|
||||
github.com/jlaffaye/ftp v0.2.1-0.20250831012827-3f092e051c94 h1:sBUrMD4Gx91zDgzTqPCr3FqFs2+3wWX7lyUYIP/isuA=
|
||||
github.com/jlaffaye/ftp v0.2.1-0.20250831012827-3f092e051c94/go.mod h1:H1+whwD0Qe3YOunlXIWhh3rlvzW5cZfkMDYGQPg+KAM=
|
||||
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
|
||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
|
||||
@ -622,8 +622,6 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
|
||||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 h1:Sa+sR8aaAMFwxhXWENEnE6ZpqhZ9d7u1RT2722Rw6hc=
|
||||
github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5/go.mod h1:UdZiFUFu6e2WjjtjxivwXWcwc1N/8zgbkBR9QNucUOY=
|
||||
github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543 h1:6Y51mutOvRGRx6KqyMNo//xk8B8o6zW9/RVmy1VamOs=
|
||||
|
@ -77,10 +77,6 @@ func InitConfig() {
|
||||
log.Fatalf("update config struct error: %+v", err)
|
||||
}
|
||||
}
|
||||
if !conf.Conf.Force {
|
||||
confFromEnv()
|
||||
}
|
||||
|
||||
if conf.Conf.MaxConcurrency > 0 {
|
||||
net.DefaultConcurrencyLimit = &net.ConcurrencyLimit{Limit: conf.Conf.MaxConcurrency}
|
||||
}
|
||||
@ -95,32 +91,26 @@ func InitConfig() {
|
||||
} else {
|
||||
conf.MaxBufferLimit = conf.Conf.MaxBufferLimit * utils.MB
|
||||
}
|
||||
log.Infof("max buffer limit: %dMB", conf.MaxBufferLimit/utils.MB)
|
||||
if conf.Conf.MmapThreshold > 0 {
|
||||
conf.MmapThreshold = conf.Conf.MmapThreshold * utils.MB
|
||||
} else {
|
||||
conf.MmapThreshold = 0
|
||||
log.Infof("max buffer limit: %d", conf.MaxBufferLimit)
|
||||
if !conf.Conf.Force {
|
||||
confFromEnv()
|
||||
}
|
||||
log.Infof("mmap threshold: %dMB", conf.Conf.MmapThreshold)
|
||||
|
||||
if len(conf.Conf.Log.Filter.Filters) == 0 {
|
||||
conf.Conf.Log.Filter.Enable = false
|
||||
}
|
||||
// convert abs path
|
||||
convertAbsPath := func(path *string) {
|
||||
if *path != "" && !filepath.IsAbs(*path) {
|
||||
if !filepath.IsAbs(*path) {
|
||||
*path = filepath.Join(pwd, *path)
|
||||
}
|
||||
}
|
||||
convertAbsPath(&conf.Conf.Database.DBFile)
|
||||
convertAbsPath(&conf.Conf.Scheme.CertFile)
|
||||
convertAbsPath(&conf.Conf.Scheme.KeyFile)
|
||||
convertAbsPath(&conf.Conf.Scheme.UnixFile)
|
||||
convertAbsPath(&conf.Conf.Log.Name)
|
||||
convertAbsPath(&conf.Conf.TempDir)
|
||||
convertAbsPath(&conf.Conf.BleveDir)
|
||||
convertAbsPath(&conf.Conf.DistDir)
|
||||
|
||||
convertAbsPath(&conf.Conf.Log.Name)
|
||||
convertAbsPath(&conf.Conf.Database.DBFile)
|
||||
if conf.Conf.DistDir != "" {
|
||||
convertAbsPath(&conf.Conf.DistDir)
|
||||
}
|
||||
err := os.MkdirAll(conf.Conf.TempDir, 0o777)
|
||||
if err != nil {
|
||||
log.Fatalf("create temp dir error: %+v", err)
|
||||
|
@ -107,11 +107,10 @@ func InitialSettings() []model.SettingItem {
|
||||
{Key: conf.AllowMounted, Value: "true", Type: conf.TypeBool, Group: model.SITE},
|
||||
{Key: conf.RobotsTxt, Value: "User-agent: *\nAllow: /", Type: conf.TypeText, Group: model.SITE},
|
||||
// style settings
|
||||
{Key: conf.Logo, Value: "https://res.oplist.org/logo/logo.svg", MigrationValue: "https://cdn.oplist.org/gh/OpenListTeam/Logo@main/logo.svg", Type: conf.TypeText, Group: model.STYLE},
|
||||
{Key: conf.Favicon, Value: "https://res.oplist.org/logo/logo.svg", MigrationValue: "https://cdn.oplist.org/gh/OpenListTeam/Logo@main/logo.svg", Type: conf.TypeString, Group: model.STYLE},
|
||||
{Key: conf.Logo, Value: "https://cdn.oplist.org/gh/OpenListTeam/Logo@main/logo.svg", Type: conf.TypeText, Group: model.STYLE},
|
||||
{Key: conf.Favicon, Value: "https://cdn.oplist.org/gh/OpenListTeam/Logo@main/logo.svg", Type: conf.TypeString, Group: model.STYLE},
|
||||
{Key: conf.MainColor, Value: "#1890ff", Type: conf.TypeString, Group: model.STYLE},
|
||||
{Key: "home_icon", Value: "🏠", Type: conf.TypeString, Group: model.STYLE},
|
||||
{Key: "share_icon", Value: "🎁", Type: conf.TypeString, Group: model.STYLE},
|
||||
{Key: "home_container", Value: "max_980px", Type: conf.TypeSelect, Options: "max_980px,hope_container", Group: model.STYLE},
|
||||
{Key: "settings_layout", Value: "list", Type: conf.TypeSelect, Options: "list,responsive", Group: model.STYLE},
|
||||
// preview settings
|
||||
@ -142,7 +141,7 @@ func InitialSettings() []model.SettingItem {
|
||||
// {Key: conf.PdfViewers, Value: `{
|
||||
// "pdf.js":"https://openlistteam.github.io/pdf.js/web/viewer.html?file=$url"
|
||||
//}`, Type: conf.TypeText, Group: model.PREVIEW},
|
||||
{Key: "audio_cover", Value: "https://res.oplist.org/logo/logo.svg", MigrationValue: "https://cdn.oplist.org/gh/OpenListTeam/Logo@main/logo.svg", Type: conf.TypeString, Group: model.PREVIEW},
|
||||
{Key: "audio_cover", Value: "https://cdn.oplist.org/gh/OpenListTeam/Logo@main/logo.svg", Type: conf.TypeString, Group: model.PREVIEW},
|
||||
{Key: conf.AudioAutoplay, Value: "true", Type: conf.TypeBool, Group: model.PREVIEW},
|
||||
{Key: conf.VideoAutoplay, Value: "true", Type: conf.TypeBool, Group: model.PREVIEW},
|
||||
{Key: conf.PreviewArchivesByDefault, Value: "true", Type: conf.TypeBool, Group: model.PREVIEW},
|
||||
@ -162,12 +161,8 @@ func InitialSettings() []model.SettingItem {
|
||||
{Key: conf.OcrApi, Value: "https://openlistteam-ocr-api-server.hf.space/ocr/file/json", MigrationValue: "https://api.example.com/ocr/file/json", Type: conf.TypeString, Group: model.GLOBAL}, // TODO: This can be replace by a community-hosted endpoint, see https://github.com/OpenListTeam/ocr_api_server
|
||||
{Key: conf.FilenameCharMapping, Value: `{"/": "|"}`, Type: conf.TypeText, Group: model.GLOBAL},
|
||||
{Key: conf.ForwardDirectLinkParams, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL},
|
||||
{Key: conf.IgnoreDirectLinkParams, Value: "sign,openlist_ts,raw", Type: conf.TypeString, Group: model.GLOBAL},
|
||||
{Key: conf.IgnoreDirectLinkParams, Value: "sign,openlist_ts", Type: conf.TypeString, Group: model.GLOBAL},
|
||||
{Key: conf.WebauthnLoginEnabled, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PUBLIC},
|
||||
{Key: conf.SharePreview, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PUBLIC},
|
||||
{Key: conf.ShareArchivePreview, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PUBLIC},
|
||||
{Key: conf.ShareForceProxy, Value: "true", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PRIVATE},
|
||||
{Key: conf.ShareSummaryContent, Value: "@{{creator}} shared {{#each files}}{{#if @first}}\"{{filename this}}\"{{/if}}{{#if @last}}{{#unless (eq @index 0)}} and {{@index}} more files{{/unless}}{{/if}}{{/each}} from {{site_title}}: {{base_url}}/@s/{{id}}{{#if pwd}} , the share code is {{pwd}}{{/if}}{{#if expires}}, please access before {{dateLocaleString expires}}.{{/if}}", Type: conf.TypeText, Group: model.GLOBAL, Flag: model.PUBLIC},
|
||||
|
||||
// single settings
|
||||
{Key: conf.Token, Value: token, Type: conf.TypeString, Group: model.SINGLE, Flag: model.PRIVATE},
|
||||
|
@ -33,8 +33,8 @@ func initUser() {
|
||||
Role: model.ADMIN,
|
||||
BasePath: "/",
|
||||
Authn: "[]",
|
||||
// 0(can see hidden) - 8(webdav read) & 12(can read archives) - 14(can share)
|
||||
Permission: 0x71FF,
|
||||
// 0(can see hidden) - 7(can remove) & 12(can read archives) - 13(can decompress archives)
|
||||
Permission: 0x31FF,
|
||||
}
|
||||
if err := op.CreateUser(admin); err != nil {
|
||||
panic(err)
|
||||
|
@ -120,7 +120,6 @@ type Config struct {
|
||||
Log LogConfig `json:"log" envPrefix:"LOG_"`
|
||||
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
|
||||
MaxBufferLimit int `json:"max_buffer_limitMB" env:"MAX_BUFFER_LIMIT_MB"`
|
||||
MmapThreshold int `json:"mmap_thresholdMB" env:"MMAP_THRESHOLD_MB"`
|
||||
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
|
||||
MaxConcurrency int `json:"max_concurrency" env:"MAX_CONCURRENCY"`
|
||||
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
|
||||
@ -177,7 +176,6 @@ func DefaultConfig(dataDir string) *Config {
|
||||
},
|
||||
},
|
||||
MaxBufferLimit: -1,
|
||||
MmapThreshold: 4,
|
||||
MaxConnections: 0,
|
||||
MaxConcurrency: 64,
|
||||
TlsInsecureSkipVerify: true,
|
||||
|
@ -33,7 +33,6 @@ const (
|
||||
PreviewArchivesByDefault = "preview_archives_by_default"
|
||||
ReadMeAutoRender = "readme_autorender"
|
||||
FilterReadMeScripts = "filter_readme_scripts"
|
||||
|
||||
// global
|
||||
HideFiles = "hide_files"
|
||||
CustomizeHead = "customize_head"
|
||||
@ -46,10 +45,6 @@ const (
|
||||
ForwardDirectLinkParams = "forward_direct_link_params"
|
||||
IgnoreDirectLinkParams = "ignore_direct_link_params"
|
||||
WebauthnLoginEnabled = "webauthn_login_enabled"
|
||||
SharePreview = "share_preview"
|
||||
ShareArchivePreview = "share_archive_preview"
|
||||
ShareForceProxy = "share_force_proxy"
|
||||
ShareSummaryContent = "share_summary_content"
|
||||
|
||||
// index
|
||||
SearchIndex = "search_index"
|
||||
@ -172,5 +167,4 @@ const (
|
||||
RequestHeaderKey
|
||||
UserAgentKey
|
||||
PathKey
|
||||
SharingIDKey
|
||||
)
|
||||
|
@ -25,10 +25,7 @@ var PrivacyReg []*regexp.Regexp
|
||||
var (
|
||||
// StoragesLoaded loaded success if empty
|
||||
StoragesLoaded = false
|
||||
// 单个Buffer最大限制
|
||||
MaxBufferLimit = 16 * 1024 * 1024
|
||||
// 超过该阈值的Buffer将使用 mmap 分配,可主动释放内存
|
||||
MmapThreshold = 4 * 1024 * 1024
|
||||
MaxBufferLimit int
|
||||
)
|
||||
var (
|
||||
RawIndexHtml string
|
||||
|
@ -12,7 +12,7 @@ var db *gorm.DB
|
||||
|
||||
func Init(d *gorm.DB) {
|
||||
db = d
|
||||
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey), new(model.SharingDB))
|
||||
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey))
|
||||
if err != nil {
|
||||
log.Fatalf("failed migrate database: %s", err.Error())
|
||||
}
|
||||
|
@ -1,62 +0,0 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils/random"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func GetSharingById(id string) (*model.SharingDB, error) {
|
||||
s := model.SharingDB{ID: id}
|
||||
if err := db.Where(s).First(&s).Error; err != nil {
|
||||
return nil, errors.Wrapf(err, "failed get sharing")
|
||||
}
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
func GetSharings(pageIndex, pageSize int) (sharings []model.SharingDB, count int64, err error) {
|
||||
sharingDB := db.Model(&model.SharingDB{})
|
||||
if err := sharingDB.Count(&count).Error; err != nil {
|
||||
return nil, 0, errors.Wrapf(err, "failed get sharings count")
|
||||
}
|
||||
if err := sharingDB.Order(columnName("id")).Offset((pageIndex - 1) * pageSize).Limit(pageSize).Find(&sharings).Error; err != nil {
|
||||
return nil, 0, errors.Wrapf(err, "failed get find sharings")
|
||||
}
|
||||
return sharings, count, nil
|
||||
}
|
||||
|
||||
func GetSharingsByCreatorId(creator uint, pageIndex, pageSize int) (sharings []model.SharingDB, count int64, err error) {
|
||||
sharingDB := db.Model(&model.SharingDB{})
|
||||
cond := model.SharingDB{CreatorId: creator}
|
||||
if err := sharingDB.Where(cond).Count(&count).Error; err != nil {
|
||||
return nil, 0, errors.Wrapf(err, "failed get sharings count")
|
||||
}
|
||||
if err := sharingDB.Where(cond).Order(columnName("id")).Offset((pageIndex - 1) * pageSize).Limit(pageSize).Find(&sharings).Error; err != nil {
|
||||
return nil, 0, errors.Wrapf(err, "failed get find sharings")
|
||||
}
|
||||
return sharings, count, nil
|
||||
}
|
||||
|
||||
func CreateSharing(s *model.SharingDB) (string, error) {
|
||||
id := random.String(8)
|
||||
for len(id) < 12 {
|
||||
old := model.SharingDB{
|
||||
ID: id,
|
||||
}
|
||||
if err := db.Where(old).First(&old).Error; err != nil {
|
||||
s.ID = id
|
||||
return id, errors.WithStack(db.Create(s).Error)
|
||||
}
|
||||
id += random.String(1)
|
||||
}
|
||||
return "", errors.New("failed find valid id")
|
||||
}
|
||||
|
||||
func UpdateSharing(s *model.SharingDB) error {
|
||||
return errors.WithStack(db.Save(s).Error)
|
||||
}
|
||||
|
||||
func DeleteSharingById(id string) error {
|
||||
s := model.SharingDB{ID: id}
|
||||
return errors.WithStack(db.Where(s).Delete(&s).Error)
|
||||
}
|
@ -23,10 +23,6 @@ var (
|
||||
UnknownArchiveFormat = errors.New("unknown archive format")
|
||||
WrongArchivePassword = errors.New("wrong archive password")
|
||||
DriverExtractNotSupported = errors.New("driver extraction not supported")
|
||||
|
||||
WrongShareCode = errors.New("wrong share code")
|
||||
InvalidSharing = errors.New("invalid sharing")
|
||||
SharingNotFound = errors.New("sharing not found")
|
||||
)
|
||||
|
||||
// NewErr wrap constant error with an extra message
|
||||
|
@ -70,25 +70,25 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
|
||||
}()
|
||||
var decompressUp model.UpdateProgress
|
||||
if t.CacheFull {
|
||||
total := int64(0)
|
||||
var total, cur int64 = 0, 0
|
||||
for _, s := range ss {
|
||||
total += s.GetSize()
|
||||
}
|
||||
t.SetTotalBytes(total)
|
||||
t.Status = "getting src object"
|
||||
part := 100 / float64(len(ss)+1)
|
||||
for i, s := range ss {
|
||||
if s.GetFile() != nil {
|
||||
continue
|
||||
for _, s := range ss {
|
||||
if s.GetFile() == nil {
|
||||
_, err = stream.CacheFullInTempFileAndWriter(s, func(p float64) {
|
||||
t.SetProgress((float64(cur) + float64(s.GetSize())*p/100.0) / float64(total))
|
||||
}, nil)
|
||||
}
|
||||
_, err = s.CacheFullAndWriter(nil, nil)
|
||||
cur += s.GetSize()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
t.SetProgress(float64(i+1) * part)
|
||||
}
|
||||
}
|
||||
decompressUp = model.UpdateProgressWithRange(t.SetProgress, 100-part, 100)
|
||||
t.SetProgress(100.0)
|
||||
decompressUp = func(_ float64) {}
|
||||
} else {
|
||||
decompressUp = t.SetProgress
|
||||
}
|
||||
|
@ -168,7 +168,7 @@ func GetStorage(path string, args *GetStoragesArgs) (driver.Driver, error) {
|
||||
func Other(ctx context.Context, args model.FsOtherArgs) (interface{}, error) {
|
||||
res, err := other(ctx, args)
|
||||
if err != nil {
|
||||
log.Errorf("failed get other %s: %+v", args.Path, err)
|
||||
log.Errorf("failed remove %s: %+v", args.Path, err)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer)
|
||||
return nil, errors.WithStack(errs.UploadNotSupported)
|
||||
}
|
||||
if file.NeedStore() {
|
||||
_, err := file.CacheFullAndWriter(nil, nil)
|
||||
_, err := file.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create temp file")
|
||||
}
|
||||
|
@ -77,26 +77,6 @@ type ArchiveDecompressArgs struct {
|
||||
PutIntoNewDir bool
|
||||
}
|
||||
|
||||
type SharingListArgs struct {
|
||||
Refresh bool
|
||||
Pwd string
|
||||
}
|
||||
|
||||
type SharingArchiveMetaArgs struct {
|
||||
ArchiveMetaArgs
|
||||
Pwd string
|
||||
}
|
||||
|
||||
type SharingArchiveListArgs struct {
|
||||
ArchiveListArgs
|
||||
Pwd string
|
||||
}
|
||||
|
||||
type SharingLinkArgs struct {
|
||||
Pwd string
|
||||
LinkArgs
|
||||
}
|
||||
|
||||
type RangeReaderIF interface {
|
||||
RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error)
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package model
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
@ -39,17 +40,16 @@ type FileStreamer interface {
|
||||
utils.ClosersIF
|
||||
Obj
|
||||
GetMimetype() string
|
||||
//SetReader(io.Reader)
|
||||
NeedStore() bool
|
||||
IsForceStreamUpload() bool
|
||||
GetExist() Obj
|
||||
SetExist(Obj)
|
||||
// for a non-seekable Stream, RangeRead supports peeking some data, and CacheFullAndWriter still works
|
||||
//for a non-seekable Stream, RangeRead supports peeking some data, and CacheFullInTempFile still works
|
||||
RangeRead(http_range.Range) (io.Reader, error)
|
||||
// for a non-seekable Stream, if Read is called, this function won't work.
|
||||
// caches the full Stream and writes it to writer (if provided, even if the stream is already cached).
|
||||
CacheFullAndWriter(up *UpdateProgress, writer io.Writer) (File, error)
|
||||
SetTmpFile(file File)
|
||||
// if the Stream is not a File and is not cached, returns nil.
|
||||
//for a non-seekable Stream, if Read is called, this function won't work
|
||||
CacheFullInTempFile() (File, error)
|
||||
SetTmpFile(r *os.File)
|
||||
GetFile() File
|
||||
}
|
||||
|
||||
|
@ -1,47 +0,0 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
type SharingDB struct {
|
||||
ID string `json:"id" gorm:"type:char(12);primaryKey"`
|
||||
FilesRaw string `json:"-" gorm:"type:text"`
|
||||
Expires *time.Time `json:"expires"`
|
||||
Pwd string `json:"pwd"`
|
||||
Accessed int `json:"accessed"`
|
||||
MaxAccessed int `json:"max_accessed"`
|
||||
CreatorId uint `json:"-"`
|
||||
Disabled bool `json:"disabled"`
|
||||
Remark string `json:"remark"`
|
||||
Readme string `json:"readme" gorm:"type:text"`
|
||||
Header string `json:"header" gorm:"type:text"`
|
||||
Sort
|
||||
}
|
||||
|
||||
type Sharing struct {
|
||||
*SharingDB
|
||||
Files []string `json:"files"`
|
||||
Creator *User `json:"-"`
|
||||
}
|
||||
|
||||
func (s *Sharing) Valid() bool {
|
||||
if s.Disabled {
|
||||
return false
|
||||
}
|
||||
if s.MaxAccessed > 0 && s.Accessed >= s.MaxAccessed {
|
||||
return false
|
||||
}
|
||||
if len(s.Files) == 0 {
|
||||
return false
|
||||
}
|
||||
if !s.Creator.CanShare() {
|
||||
return false
|
||||
}
|
||||
if s.Expires != nil && !s.Expires.IsZero() && s.Expires.Before(time.Now()) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *Sharing) Verify(pwd string) bool {
|
||||
return s.Pwd == "" || s.Pwd == pwd
|
||||
}
|
@ -54,7 +54,6 @@ type User struct {
|
||||
// 11: ftp/sftp write
|
||||
// 12: can read archives
|
||||
// 13: can decompress archives
|
||||
// 14: can share
|
||||
Permission int32 `json:"permission"`
|
||||
OtpSecret string `json:"-"`
|
||||
SsoID string `json:"sso_id"` // unique by sso platform
|
||||
@ -146,10 +145,6 @@ func (u *User) CanDecompress() bool {
|
||||
return (u.Permission>>13)&1 == 1
|
||||
}
|
||||
|
||||
func (u *User) CanShare() bool {
|
||||
return (u.Permission>>14)&1 == 1
|
||||
}
|
||||
|
||||
func (u *User) JoinPath(reqPath string) (string, error) {
|
||||
return utils.JoinBasePath(u.BasePath, reqPath)
|
||||
}
|
||||
@ -190,5 +185,5 @@ func (u *User) WebAuthnCredentials() []webauthn.Credential {
|
||||
}
|
||||
|
||||
func (u *User) WebAuthnIcon() string {
|
||||
return "https://res.oplist.org/logo/logo.svg"
|
||||
return "https://cdn.oplist.org/gh/OpenListTeam/Logo@main/logo.svg"
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -14,7 +15,6 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/rclone/rclone/lib/mmap"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
"github.com/aws/aws-sdk-go/aws/awsutil"
|
||||
@ -255,10 +255,7 @@ func (d *downloader) sendChunkTask(newConcurrency bool) error {
|
||||
finalSize += firstSize - minSize
|
||||
}
|
||||
}
|
||||
err := buf.Reset(int(finalSize))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf.Reset(int(finalSize))
|
||||
ch := chunk{
|
||||
start: d.pos,
|
||||
size: finalSize,
|
||||
@ -648,13 +645,11 @@ func (mr MultiReadCloser) Close() error {
|
||||
}
|
||||
|
||||
type Buf struct {
|
||||
size int //expected size
|
||||
ctx context.Context
|
||||
offR int
|
||||
offW int
|
||||
rw sync.Mutex
|
||||
buf []byte
|
||||
mmap bool
|
||||
buffer *bytes.Buffer
|
||||
size int //expected size
|
||||
ctx context.Context
|
||||
off int
|
||||
rw sync.Mutex
|
||||
|
||||
readSignal chan struct{}
|
||||
readPending bool
|
||||
@ -663,100 +658,76 @@ type Buf struct {
|
||||
// NewBuf is a buffer that can have 1 read & 1 write at the same time.
|
||||
// when read is faster write, immediately feed data to read after written
|
||||
func NewBuf(ctx context.Context, maxSize int) *Buf {
|
||||
br := &Buf{
|
||||
ctx: ctx,
|
||||
size: maxSize,
|
||||
return &Buf{
|
||||
ctx: ctx,
|
||||
buffer: bytes.NewBuffer(make([]byte, 0, maxSize)),
|
||||
size: maxSize,
|
||||
|
||||
readSignal: make(chan struct{}, 1),
|
||||
}
|
||||
if conf.MmapThreshold > 0 && maxSize >= conf.MmapThreshold {
|
||||
m, err := mmap.Alloc(maxSize)
|
||||
if err == nil {
|
||||
br.buf = m
|
||||
br.mmap = true
|
||||
return br
|
||||
}
|
||||
}
|
||||
br.buf = make([]byte, maxSize)
|
||||
return br
|
||||
}
|
||||
|
||||
func (br *Buf) Reset(size int) error {
|
||||
func (br *Buf) Reset(size int) {
|
||||
br.rw.Lock()
|
||||
defer br.rw.Unlock()
|
||||
if br.buf == nil {
|
||||
return io.ErrClosedPipe
|
||||
}
|
||||
if size > cap(br.buf) {
|
||||
return fmt.Errorf("reset size %d exceeds max size %d", size, cap(br.buf))
|
||||
if br.buffer == nil {
|
||||
return
|
||||
}
|
||||
br.buffer.Reset()
|
||||
br.size = size
|
||||
br.offR = 0
|
||||
br.offW = 0
|
||||
return nil
|
||||
br.off = 0
|
||||
}
|
||||
|
||||
func (br *Buf) Read(p []byte) (int, error) {
|
||||
func (br *Buf) Read(p []byte) (n int, err error) {
|
||||
if err := br.ctx.Err(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(p) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
if br.offR >= br.size {
|
||||
if br.off >= br.size {
|
||||
return 0, io.EOF
|
||||
}
|
||||
for {
|
||||
br.rw.Lock()
|
||||
if br.buf == nil {
|
||||
br.rw.Unlock()
|
||||
return 0, io.ErrClosedPipe
|
||||
if br.buffer != nil {
|
||||
n, err = br.buffer.Read(p)
|
||||
} else {
|
||||
err = io.ErrClosedPipe
|
||||
}
|
||||
|
||||
if br.offW < br.offR {
|
||||
if err != nil && err != io.EOF {
|
||||
br.rw.Unlock()
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
return
|
||||
}
|
||||
if br.offW == br.offR {
|
||||
br.readPending = true
|
||||
if n > 0 {
|
||||
br.off += n
|
||||
br.rw.Unlock()
|
||||
select {
|
||||
case <-br.ctx.Done():
|
||||
return 0, br.ctx.Err()
|
||||
case _, ok := <-br.readSignal:
|
||||
if !ok {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
continue
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
n := copy(p, br.buf[br.offR:br.offW])
|
||||
br.offR += n
|
||||
br.readPending = true
|
||||
br.rw.Unlock()
|
||||
if n < len(p) && br.offR >= br.size {
|
||||
return n, io.EOF
|
||||
// n==0, err==io.EOF
|
||||
select {
|
||||
case <-br.ctx.Done():
|
||||
return 0, br.ctx.Err()
|
||||
case _, ok := <-br.readSignal:
|
||||
if !ok {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
continue
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (br *Buf) Write(p []byte) (int, error) {
|
||||
func (br *Buf) Write(p []byte) (n int, err error) {
|
||||
if err := br.ctx.Err(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(p) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
br.rw.Lock()
|
||||
defer br.rw.Unlock()
|
||||
if br.buf == nil {
|
||||
if br.buffer == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
if br.offW >= br.size {
|
||||
return 0, io.ErrShortWrite
|
||||
}
|
||||
n := copy(br.buf[br.offW:], p[:min(br.size-br.offW, len(p))])
|
||||
br.offW += n
|
||||
n, err = br.buffer.Write(p)
|
||||
if br.readPending {
|
||||
br.readPending = false
|
||||
select {
|
||||
@ -764,21 +735,12 @@ func (br *Buf) Write(p []byte) (int, error) {
|
||||
default:
|
||||
}
|
||||
}
|
||||
if n < len(p) {
|
||||
return n, io.ErrShortWrite
|
||||
}
|
||||
return n, nil
|
||||
return
|
||||
}
|
||||
|
||||
func (br *Buf) Close() error {
|
||||
func (br *Buf) Close() {
|
||||
br.rw.Lock()
|
||||
defer br.rw.Unlock()
|
||||
var err error
|
||||
if br.mmap {
|
||||
err = mmap.Free(br.buf)
|
||||
br.mmap = false
|
||||
}
|
||||
br.buf = nil
|
||||
br.buffer = nil
|
||||
close(br.readSignal)
|
||||
return err
|
||||
}
|
||||
|
@ -1,139 +0,0 @@
|
||||
package op
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
stdpath "path"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/db"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/singleflight"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/OpenListTeam/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func makeJoined(sdb []model.SharingDB) []model.Sharing {
|
||||
creator := make(map[uint]*model.User)
|
||||
return utils.MustSliceConvert(sdb, func(s model.SharingDB) model.Sharing {
|
||||
var c *model.User
|
||||
var ok bool
|
||||
if c, ok = creator[s.CreatorId]; !ok {
|
||||
var err error
|
||||
if c, err = GetUserById(s.CreatorId); err != nil {
|
||||
c = nil
|
||||
} else {
|
||||
creator[s.CreatorId] = c
|
||||
}
|
||||
}
|
||||
var files []string
|
||||
if err := utils.Json.UnmarshalFromString(s.FilesRaw, &files); err != nil {
|
||||
files = make([]string, 0)
|
||||
}
|
||||
return model.Sharing{
|
||||
SharingDB: &s,
|
||||
Files: files,
|
||||
Creator: c,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
var sharingCache = cache.NewMemCache(cache.WithShards[*model.Sharing](8))
|
||||
var sharingG singleflight.Group[*model.Sharing]
|
||||
|
||||
func GetSharingById(id string, refresh ...bool) (*model.Sharing, error) {
|
||||
if !utils.IsBool(refresh...) {
|
||||
if sharing, ok := sharingCache.Get(id); ok {
|
||||
log.Debugf("use cache when get sharing %s", id)
|
||||
return sharing, nil
|
||||
}
|
||||
}
|
||||
sharing, err, _ := sharingG.Do(id, func() (*model.Sharing, error) {
|
||||
s, err := db.GetSharingById(id)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessagef(err, "failed get sharing [%s]", id)
|
||||
}
|
||||
creator, err := GetUserById(s.CreatorId)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessagef(err, "failed get sharing creator [%s]", id)
|
||||
}
|
||||
var files []string
|
||||
if err = utils.Json.UnmarshalFromString(s.FilesRaw, &files); err != nil {
|
||||
files = make([]string, 0)
|
||||
}
|
||||
return &model.Sharing{
|
||||
SharingDB: s,
|
||||
Files: files,
|
||||
Creator: creator,
|
||||
}, nil
|
||||
})
|
||||
return sharing, err
|
||||
}
|
||||
|
||||
func GetSharings(pageIndex, pageSize int) ([]model.Sharing, int64, error) {
|
||||
s, cnt, err := db.GetSharings(pageIndex, pageSize)
|
||||
if err != nil {
|
||||
return nil, 0, errors.WithStack(err)
|
||||
}
|
||||
return makeJoined(s), cnt, nil
|
||||
}
|
||||
|
||||
func GetSharingsByCreatorId(userId uint, pageIndex, pageSize int) ([]model.Sharing, int64, error) {
|
||||
s, cnt, err := db.GetSharingsByCreatorId(userId, pageIndex, pageSize)
|
||||
if err != nil {
|
||||
return nil, 0, errors.WithStack(err)
|
||||
}
|
||||
return makeJoined(s), cnt, nil
|
||||
}
|
||||
|
||||
func GetSharingUnwrapPath(sharing *model.Sharing, path string) (unwrapPath string, err error) {
|
||||
if len(sharing.Files) == 0 {
|
||||
return "", errors.New("cannot get actual path of an invalid sharing")
|
||||
}
|
||||
if len(sharing.Files) == 1 {
|
||||
return stdpath.Join(sharing.Files[0], path), nil
|
||||
}
|
||||
path = utils.FixAndCleanPath(path)[1:]
|
||||
if len(path) == 0 {
|
||||
return "", errors.New("cannot get actual path of a sharing root path")
|
||||
}
|
||||
mapPath := ""
|
||||
child, rest, _ := strings.Cut(path, "/")
|
||||
for _, c := range sharing.Files {
|
||||
if child == stdpath.Base(c) {
|
||||
mapPath = c
|
||||
break
|
||||
}
|
||||
}
|
||||
if mapPath == "" {
|
||||
return "", fmt.Errorf("failed find child [%s] of sharing [%s]", child, sharing.ID)
|
||||
}
|
||||
return stdpath.Join(mapPath, rest), nil
|
||||
}
|
||||
|
||||
func CreateSharing(sharing *model.Sharing) (id string, err error) {
|
||||
sharing.CreatorId = sharing.Creator.ID
|
||||
sharing.FilesRaw, err = utils.Json.MarshalToString(utils.MustSliceConvert(sharing.Files, utils.FixAndCleanPath))
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
return db.CreateSharing(sharing.SharingDB)
|
||||
}
|
||||
|
||||
func UpdateSharing(sharing *model.Sharing, skipMarshal ...bool) (err error) {
|
||||
if !utils.IsBool(skipMarshal...) {
|
||||
sharing.CreatorId = sharing.Creator.ID
|
||||
sharing.FilesRaw, err = utils.Json.MarshalToString(utils.MustSliceConvert(sharing.Files, utils.FixAndCleanPath))
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
sharingCache.Del(sharing.ID)
|
||||
return db.UpdateSharing(sharing.SharingDB)
|
||||
}
|
||||
|
||||
func DeleteSharing(sid string) error {
|
||||
sharingCache.Del(sid)
|
||||
return db.DeleteSharingById(sid)
|
||||
}
|
@ -1,65 +0,0 @@
|
||||
package sharing
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func archiveMeta(ctx context.Context, sid, path string, args model.SharingArchiveMetaArgs) (*model.Sharing, *model.ArchiveMetaProvider, error) {
|
||||
sharing, err := op.GetSharingById(sid, args.Refresh)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(errs.SharingNotFound)
|
||||
}
|
||||
if !sharing.Valid() {
|
||||
return sharing, nil, errors.WithStack(errs.InvalidSharing)
|
||||
}
|
||||
if !sharing.Verify(args.Pwd) {
|
||||
return sharing, nil, errors.WithStack(errs.WrongShareCode)
|
||||
}
|
||||
path = utils.FixAndCleanPath(path)
|
||||
if len(sharing.Files) == 1 || path != "/" {
|
||||
unwrapPath, err := op.GetSharingUnwrapPath(sharing, path)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get sharing unwrap path")
|
||||
}
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(unwrapPath)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get sharing file")
|
||||
}
|
||||
obj, err := op.GetArchiveMeta(ctx, storage, actualPath, args.ArchiveMetaArgs)
|
||||
return sharing, obj, err
|
||||
}
|
||||
return nil, nil, errors.New("cannot get sharing root archive meta")
|
||||
}
|
||||
|
||||
func archiveList(ctx context.Context, sid, path string, args model.SharingArchiveListArgs) (*model.Sharing, []model.Obj, error) {
|
||||
sharing, err := op.GetSharingById(sid, args.Refresh)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(errs.SharingNotFound)
|
||||
}
|
||||
if !sharing.Valid() {
|
||||
return sharing, nil, errors.WithStack(errs.InvalidSharing)
|
||||
}
|
||||
if !sharing.Verify(args.Pwd) {
|
||||
return sharing, nil, errors.WithStack(errs.WrongShareCode)
|
||||
}
|
||||
path = utils.FixAndCleanPath(path)
|
||||
if len(sharing.Files) == 1 || path != "/" {
|
||||
unwrapPath, err := op.GetSharingUnwrapPath(sharing, path)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get sharing unwrap path")
|
||||
}
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(unwrapPath)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get sharing file")
|
||||
}
|
||||
obj, err := op.ListArchive(ctx, storage, actualPath, args.ArchiveListArgs)
|
||||
return sharing, obj, err
|
||||
}
|
||||
return nil, nil, errors.New("cannot get sharing root archive list")
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
package sharing
|
||||
|
||||
import (
|
||||
"context"
|
||||
stdpath "path"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func get(ctx context.Context, sid, path string, args model.SharingListArgs) (*model.Sharing, model.Obj, error) {
|
||||
sharing, err := op.GetSharingById(sid, args.Refresh)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(errs.SharingNotFound)
|
||||
}
|
||||
if !sharing.Valid() {
|
||||
return sharing, nil, errors.WithStack(errs.InvalidSharing)
|
||||
}
|
||||
if !sharing.Verify(args.Pwd) {
|
||||
return sharing, nil, errors.WithStack(errs.WrongShareCode)
|
||||
}
|
||||
path = utils.FixAndCleanPath(path)
|
||||
if len(sharing.Files) == 1 || path != "/" {
|
||||
unwrapPath, err := op.GetSharingUnwrapPath(sharing, path)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get sharing unwrap path")
|
||||
}
|
||||
if unwrapPath != "/" {
|
||||
virtualFiles := op.GetStorageVirtualFilesByPath(stdpath.Dir(unwrapPath))
|
||||
for _, f := range virtualFiles {
|
||||
if f.GetName() == stdpath.Base(unwrapPath) {
|
||||
return sharing, f, nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return sharing, &model.Object{
|
||||
Name: sid,
|
||||
Size: 0,
|
||||
Modified: time.Time{},
|
||||
IsFolder: true,
|
||||
}, nil
|
||||
}
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(unwrapPath)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get sharing file")
|
||||
}
|
||||
obj, err := op.Get(ctx, storage, actualPath)
|
||||
return sharing, obj, err
|
||||
}
|
||||
return sharing, &model.Object{
|
||||
Name: sid,
|
||||
Size: 0,
|
||||
Modified: time.Time{},
|
||||
IsFolder: true,
|
||||
}, nil
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
package sharing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/OpenListTeam/OpenList/v4/server/common"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func link(ctx context.Context, sid, path string, args *LinkArgs) (*model.Sharing, *model.Link, model.Obj, error) {
|
||||
sharing, err := op.GetSharingById(sid, args.SharingListArgs.Refresh)
|
||||
if err != nil {
|
||||
return nil, nil, nil, errors.WithStack(errs.SharingNotFound)
|
||||
}
|
||||
if !sharing.Valid() {
|
||||
return sharing, nil, nil, errors.WithStack(errs.InvalidSharing)
|
||||
}
|
||||
if !sharing.Verify(args.Pwd) {
|
||||
return sharing, nil, nil, errors.WithStack(errs.WrongShareCode)
|
||||
}
|
||||
path = utils.FixAndCleanPath(path)
|
||||
if len(sharing.Files) == 1 || path != "/" {
|
||||
unwrapPath, err := op.GetSharingUnwrapPath(sharing, path)
|
||||
if err != nil {
|
||||
return nil, nil, nil, errors.WithMessage(err, "failed get sharing unwrap path")
|
||||
}
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(unwrapPath)
|
||||
if err != nil {
|
||||
return nil, nil, nil, errors.WithMessage(err, "failed get sharing link")
|
||||
}
|
||||
l, obj, err := op.Link(ctx, storage, actualPath, args.LinkArgs)
|
||||
if err != nil {
|
||||
return nil, nil, nil, errors.WithMessage(err, "failed get sharing link")
|
||||
}
|
||||
if l.URL != "" && !strings.HasPrefix(l.URL, "http://") && !strings.HasPrefix(l.URL, "https://") {
|
||||
l.URL = common.GetApiUrl(ctx) + l.URL
|
||||
}
|
||||
return sharing, l, obj, nil
|
||||
}
|
||||
return nil, nil, nil, errors.New("cannot get sharing root link")
|
||||
}
|
@ -1,83 +0,0 @@
|
||||
package sharing
|
||||
|
||||
import (
|
||||
"context"
|
||||
stdpath "path"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func list(ctx context.Context, sid, path string, args model.SharingListArgs) (*model.Sharing, []model.Obj, error) {
|
||||
sharing, err := op.GetSharingById(sid, args.Refresh)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(errs.SharingNotFound)
|
||||
}
|
||||
if !sharing.Valid() {
|
||||
return sharing, nil, errors.WithStack(errs.InvalidSharing)
|
||||
}
|
||||
if !sharing.Verify(args.Pwd) {
|
||||
return sharing, nil, errors.WithStack(errs.WrongShareCode)
|
||||
}
|
||||
path = utils.FixAndCleanPath(path)
|
||||
if len(sharing.Files) == 1 || path != "/" {
|
||||
unwrapPath, err := op.GetSharingUnwrapPath(sharing, path)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get sharing unwrap path")
|
||||
}
|
||||
virtualFiles := op.GetStorageVirtualFilesByPath(unwrapPath)
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(unwrapPath)
|
||||
if err != nil && len(virtualFiles) == 0 {
|
||||
return nil, nil, errors.WithMessage(err, "failed list sharing")
|
||||
}
|
||||
var objs []model.Obj
|
||||
if storage != nil {
|
||||
objs, err = op.List(ctx, storage, actualPath, model.ListArgs{
|
||||
Refresh: args.Refresh,
|
||||
ReqPath: stdpath.Join(sid, path),
|
||||
})
|
||||
if err != nil && len(virtualFiles) == 0 {
|
||||
return nil, nil, errors.WithMessage(err, "failed list sharing")
|
||||
}
|
||||
}
|
||||
om := model.NewObjMerge()
|
||||
objs = om.Merge(objs, virtualFiles...)
|
||||
model.SortFiles(objs, sharing.OrderBy, sharing.OrderDirection)
|
||||
model.ExtractFolder(objs, sharing.ExtractFolder)
|
||||
return sharing, objs, nil
|
||||
}
|
||||
objs := make([]model.Obj, 0, len(sharing.Files))
|
||||
for _, f := range sharing.Files {
|
||||
if f != "/" {
|
||||
isVf := false
|
||||
virtualFiles := op.GetStorageVirtualFilesByPath(stdpath.Dir(f))
|
||||
for _, vf := range virtualFiles {
|
||||
if vf.GetName() == stdpath.Base(f) {
|
||||
objs = append(objs, vf)
|
||||
isVf = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isVf {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(f)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
obj, err := op.Get(ctx, storage, actualPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
objs = append(objs, obj)
|
||||
}
|
||||
model.SortFiles(objs, sharing.OrderBy, sharing.OrderDirection)
|
||||
model.ExtractFolder(objs, sharing.ExtractFolder)
|
||||
return sharing, objs, nil
|
||||
}
|
@ -1,58 +0,0 @@
|
||||
package sharing
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func List(ctx context.Context, sid, path string, args model.SharingListArgs) (*model.Sharing, []model.Obj, error) {
|
||||
sharing, res, err := list(ctx, sid, path, args)
|
||||
if err != nil {
|
||||
log.Errorf("failed list sharing %s/%s: %+v", sid, path, err)
|
||||
return nil, nil, err
|
||||
}
|
||||
return sharing, res, nil
|
||||
}
|
||||
|
||||
func Get(ctx context.Context, sid, path string, args model.SharingListArgs) (*model.Sharing, model.Obj, error) {
|
||||
sharing, res, err := get(ctx, sid, path, args)
|
||||
if err != nil {
|
||||
log.Warnf("failed get sharing %s/%s: %s", sid, path, err)
|
||||
return nil, nil, err
|
||||
}
|
||||
return sharing, res, nil
|
||||
}
|
||||
|
||||
func ArchiveMeta(ctx context.Context, sid, path string, args model.SharingArchiveMetaArgs) (*model.Sharing, *model.ArchiveMetaProvider, error) {
|
||||
sharing, res, err := archiveMeta(ctx, sid, path, args)
|
||||
if err != nil {
|
||||
log.Warnf("failed get sharing archive meta %s/%s: %s", sid, path, err)
|
||||
return nil, nil, err
|
||||
}
|
||||
return sharing, res, nil
|
||||
}
|
||||
|
||||
func ArchiveList(ctx context.Context, sid, path string, args model.SharingArchiveListArgs) (*model.Sharing, []model.Obj, error) {
|
||||
sharing, res, err := archiveList(ctx, sid, path, args)
|
||||
if err != nil {
|
||||
log.Warnf("failed list sharing archive %s/%s: %s", sid, path, err)
|
||||
return nil, nil, err
|
||||
}
|
||||
return sharing, res, nil
|
||||
}
|
||||
|
||||
type LinkArgs struct {
|
||||
model.SharingListArgs
|
||||
model.LinkArgs
|
||||
}
|
||||
|
||||
func Link(ctx context.Context, sid, path string, args *LinkArgs) (*model.Sharing, *model.Link, model.Obj, error) {
|
||||
sharing, res, file, err := link(ctx, sid, path, args)
|
||||
if err != nil {
|
||||
log.Errorf("failed get sharing link %s/%s: %+v", sid, path, err)
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return sharing, res, file, nil
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -12,10 +13,8 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/buffer"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/rclone/rclone/lib/mmap"
|
||||
"go4.org/readerutil"
|
||||
)
|
||||
|
||||
@ -28,19 +27,13 @@ type FileStream struct {
|
||||
ForceStreamUpload bool
|
||||
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
|
||||
utils.Closers
|
||||
|
||||
tmpFile model.File //if present, tmpFile has full content, it will be deleted at last
|
||||
peekBuff *buffer.Reader
|
||||
size int64
|
||||
oriReader io.Reader // the original reader, used for caching
|
||||
tmpFile *os.File //if present, tmpFile has full content, it will be deleted at last
|
||||
peekBuff *bytes.Reader
|
||||
}
|
||||
|
||||
func (f *FileStream) GetSize() int64 {
|
||||
if f.size > 0 {
|
||||
return f.size
|
||||
}
|
||||
if file, ok := f.tmpFile.(*os.File); ok {
|
||||
info, err := file.Stat()
|
||||
if f.tmpFile != nil {
|
||||
info, err := f.tmpFile.Stat()
|
||||
if err == nil {
|
||||
return info.Size()
|
||||
}
|
||||
@ -61,20 +54,16 @@ func (f *FileStream) IsForceStreamUpload() bool {
|
||||
}
|
||||
|
||||
func (f *FileStream) Close() error {
|
||||
if f.peekBuff != nil {
|
||||
f.peekBuff.Reset()
|
||||
f.peekBuff = nil
|
||||
}
|
||||
|
||||
var err1, err2 error
|
||||
|
||||
err1 = f.Closers.Close()
|
||||
if errors.Is(err1, os.ErrClosed) {
|
||||
err1 = nil
|
||||
}
|
||||
if file, ok := f.tmpFile.(*os.File); ok {
|
||||
err2 = os.RemoveAll(file.Name())
|
||||
if f.tmpFile != nil {
|
||||
err2 = os.RemoveAll(f.tmpFile.Name())
|
||||
if err2 != nil {
|
||||
err2 = errs.NewErr(err2, "failed to remove tmpFile [%s]", file.Name())
|
||||
err2 = errs.NewErr(err2, "failed to remove tmpFile [%s]", f.tmpFile.Name())
|
||||
} else {
|
||||
f.tmpFile = nil
|
||||
}
|
||||
@ -90,55 +79,20 @@ func (f *FileStream) SetExist(obj model.Obj) {
|
||||
f.Exist = obj
|
||||
}
|
||||
|
||||
// CacheFullAndWriter save all data into tmpFile or memory.
|
||||
// It's not thread-safe!
|
||||
func (f *FileStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writer) (model.File, error) {
|
||||
if cache := f.GetFile(); cache != nil {
|
||||
if writer == nil {
|
||||
return cache, nil
|
||||
}
|
||||
_, err := cache.Seek(0, io.SeekStart)
|
||||
if err == nil {
|
||||
reader := f.Reader
|
||||
if up != nil {
|
||||
cacheProgress := model.UpdateProgressWithRange(*up, 0, 50)
|
||||
*up = model.UpdateProgressWithRange(*up, 50, 100)
|
||||
reader = &ReaderUpdatingProgress{
|
||||
Reader: &SimpleReaderWithSize{
|
||||
Reader: reader,
|
||||
Size: f.GetSize(),
|
||||
},
|
||||
UpdateProgress: cacheProgress,
|
||||
}
|
||||
}
|
||||
_, err = utils.CopyWithBuffer(writer, reader)
|
||||
if err == nil {
|
||||
_, err = cache.Seek(0, io.SeekStart)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cache, nil
|
||||
// CacheFullInTempFile save all data into tmpFile. Not recommended since it wears disk,
|
||||
// and can't start upload until the file is written. It's not thread-safe!
|
||||
func (f *FileStream) CacheFullInTempFile() (model.File, error) {
|
||||
if file := f.GetFile(); file != nil {
|
||||
return file, nil
|
||||
}
|
||||
|
||||
reader := f.Reader
|
||||
if up != nil {
|
||||
cacheProgress := model.UpdateProgressWithRange(*up, 0, 50)
|
||||
*up = model.UpdateProgressWithRange(*up, 50, 100)
|
||||
reader = &ReaderUpdatingProgress{
|
||||
Reader: &SimpleReaderWithSize{
|
||||
Reader: reader,
|
||||
Size: f.GetSize(),
|
||||
},
|
||||
UpdateProgress: cacheProgress,
|
||||
}
|
||||
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if writer != nil {
|
||||
reader = io.TeeReader(reader, writer)
|
||||
}
|
||||
f.Reader = reader
|
||||
return f.cache(f.GetSize())
|
||||
f.Add(tmpF)
|
||||
f.tmpFile = tmpF
|
||||
f.Reader = tmpF
|
||||
return tmpF, nil
|
||||
}
|
||||
|
||||
func (f *FileStream) GetFile() model.File {
|
||||
@ -152,80 +106,40 @@ func (f *FileStream) GetFile() model.File {
|
||||
}
|
||||
|
||||
// RangeRead have to cache all data first since only Reader is provided.
|
||||
// It's not thread-safe!
|
||||
// also support a peeking RangeRead at very start, but won't buffer more than conf.MaxBufferLimit data in memory
|
||||
func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
|
||||
if httpRange.Length < 0 || httpRange.Start+httpRange.Length > f.GetSize() {
|
||||
httpRange.Length = f.GetSize() - httpRange.Start
|
||||
}
|
||||
if f.GetFile() != nil {
|
||||
return io.NewSectionReader(f.GetFile(), httpRange.Start, httpRange.Length), nil
|
||||
var cache io.ReaderAt = f.GetFile()
|
||||
if cache != nil {
|
||||
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
|
||||
size := httpRange.Start + httpRange.Length
|
||||
if f.peekBuff != nil && size <= int64(f.peekBuff.Len()) {
|
||||
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
|
||||
cache, err := f.cache(size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
|
||||
// *旧笔记
|
||||
// 使用bytes.Buffer作为io.CopyBuffer的写入对象,CopyBuffer会调用Buffer.ReadFrom
|
||||
// 即使被写入的数据量与Buffer.Cap一致,Buffer也会扩大
|
||||
|
||||
func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
|
||||
if maxCacheSize > int64(conf.MaxBufferLimit) {
|
||||
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
|
||||
if size <= int64(conf.MaxBufferLimit) {
|
||||
bufSize := min(size, f.GetSize())
|
||||
// 使用bytes.Buffer作为io.CopyBuffer的写入对象,CopyBuffer会调用Buffer.ReadFrom
|
||||
// 即使被写入的数据量与Buffer.Cap一致,Buffer也会扩大
|
||||
buf := make([]byte, bufSize)
|
||||
n, err := io.ReadFull(f.Reader, buf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
|
||||
}
|
||||
f.peekBuff = bytes.NewReader(buf)
|
||||
f.Reader = io.MultiReader(f.peekBuff, f.Reader)
|
||||
cache = f.peekBuff
|
||||
} else {
|
||||
var err error
|
||||
cache, err = f.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.Add(tmpF)
|
||||
f.tmpFile = tmpF
|
||||
f.Reader = tmpF
|
||||
return tmpF, nil
|
||||
}
|
||||
|
||||
if f.peekBuff == nil {
|
||||
f.peekBuff = &buffer.Reader{}
|
||||
f.oriReader = f.Reader
|
||||
}
|
||||
bufSize := maxCacheSize - int64(f.peekBuff.Len())
|
||||
var buf []byte
|
||||
if conf.MmapThreshold > 0 && bufSize >= int64(conf.MmapThreshold) {
|
||||
m, err := mmap.Alloc(int(bufSize))
|
||||
if err == nil {
|
||||
f.Add(utils.CloseFunc(func() error {
|
||||
return mmap.Free(m)
|
||||
}))
|
||||
buf = m
|
||||
}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, bufSize)
|
||||
}
|
||||
n, err := io.ReadFull(f.oriReader, buf)
|
||||
if bufSize != int64(n) {
|
||||
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
|
||||
}
|
||||
f.peekBuff.Append(buf)
|
||||
if int64(f.peekBuff.Len()) >= f.GetSize() {
|
||||
f.Reader = f.peekBuff
|
||||
f.oriReader = nil
|
||||
} else {
|
||||
f.Reader = io.MultiReader(f.peekBuff, f.oriReader)
|
||||
}
|
||||
return f.peekBuff, nil
|
||||
}
|
||||
|
||||
func (f *FileStream) SetTmpFile(file model.File) {
|
||||
f.AddIfCloser(file)
|
||||
f.tmpFile = file
|
||||
f.Reader = file
|
||||
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
|
||||
var _ model.FileStreamer = (*SeekableStream)(nil)
|
||||
@ -242,6 +156,7 @@ type SeekableStream struct {
|
||||
*FileStream
|
||||
// should have one of belows to support rangeRead
|
||||
rangeReadCloser model.RangeReadCloserIF
|
||||
size int64
|
||||
}
|
||||
|
||||
func NewSeekableStream(fs *FileStream, link *model.Link) (*SeekableStream, error) {
|
||||
@ -263,26 +178,38 @@ func NewSeekableStream(fs *FileStream, link *model.Link) (*SeekableStream, error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rrc := &model.RangeReadCloser{
|
||||
RangeReader: rr,
|
||||
}
|
||||
if _, ok := rr.(*model.FileRangeReader); ok {
|
||||
fs.Reader, err = rrc.RangeRead(fs.Ctx, http_range.Range{Length: -1})
|
||||
fs.Reader, err = rr.RangeRead(fs.Ctx, http_range.Range{Length: -1})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs.Add(link)
|
||||
return &SeekableStream{FileStream: fs, size: size}, nil
|
||||
}
|
||||
rrc := &model.RangeReadCloser{
|
||||
RangeReader: rr,
|
||||
}
|
||||
fs.size = size
|
||||
fs.Add(link)
|
||||
fs.Add(rrc)
|
||||
return &SeekableStream{FileStream: fs, rangeReadCloser: rrc}, nil
|
||||
return &SeekableStream{FileStream: fs, rangeReadCloser: rrc, size: size}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("illegal seekableStream")
|
||||
}
|
||||
|
||||
func (ss *SeekableStream) GetSize() int64 {
|
||||
if ss.size > 0 {
|
||||
return ss.size
|
||||
}
|
||||
return ss.FileStream.GetSize()
|
||||
}
|
||||
|
||||
//func (ss *SeekableStream) Peek(length int) {
|
||||
//
|
||||
//}
|
||||
|
||||
// RangeRead is not thread-safe, pls use it in single thread only.
|
||||
func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
|
||||
if ss.GetFile() == nil && ss.rangeReadCloser != nil {
|
||||
if ss.tmpFile == nil && ss.rangeReadCloser != nil {
|
||||
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, httpRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -292,37 +219,47 @@ func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, erro
|
||||
return ss.FileStream.RangeRead(httpRange)
|
||||
}
|
||||
|
||||
//func (f *FileStream) GetReader() io.Reader {
|
||||
// return f.Reader
|
||||
//}
|
||||
|
||||
// only provide Reader as full stream when it's demanded. in rapid-upload, we can skip this to save memory
|
||||
func (ss *SeekableStream) Read(p []byte) (n int, err error) {
|
||||
if err := ss.generateReader(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return ss.FileStream.Read(p)
|
||||
}
|
||||
|
||||
func (ss *SeekableStream) generateReader() error {
|
||||
if ss.Reader == nil {
|
||||
if ss.rangeReadCloser == nil {
|
||||
return fmt.Errorf("illegal seekableStream")
|
||||
return 0, fmt.Errorf("illegal seekableStream")
|
||||
}
|
||||
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, http_range.Range{Length: -1})
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
ss.Reader = rc
|
||||
}
|
||||
return nil
|
||||
return ss.Reader.Read(p)
|
||||
}
|
||||
|
||||
func (ss *SeekableStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writer) (model.File, error) {
|
||||
if err := ss.generateReader(); err != nil {
|
||||
func (ss *SeekableStream) CacheFullInTempFile() (model.File, error) {
|
||||
if file := ss.GetFile(); file != nil {
|
||||
return file, nil
|
||||
}
|
||||
tmpF, err := utils.CreateTempFile(ss, ss.GetSize())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ss.FileStream.CacheFullAndWriter(up, writer)
|
||||
ss.Add(tmpF)
|
||||
ss.tmpFile = tmpF
|
||||
ss.Reader = tmpF
|
||||
return tmpF, nil
|
||||
}
|
||||
|
||||
func (f *FileStream) SetTmpFile(r *os.File) {
|
||||
f.Add(r)
|
||||
f.tmpFile = r
|
||||
f.Reader = r
|
||||
}
|
||||
|
||||
type ReaderWithSize interface {
|
||||
io.Reader
|
||||
io.ReadCloser
|
||||
GetSize() int64
|
||||
}
|
||||
|
||||
@ -356,10 +293,7 @@ func (r *ReaderUpdatingProgress) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (r *ReaderUpdatingProgress) Close() error {
|
||||
if c, ok := r.Reader.(io.Closer); ok {
|
||||
return c.Close()
|
||||
}
|
||||
return nil
|
||||
return r.Reader.Close()
|
||||
}
|
||||
|
||||
type RangeReadReadAtSeeker struct {
|
||||
@ -377,20 +311,19 @@ type headCache struct {
|
||||
func (c *headCache) head(p []byte) (int, error) {
|
||||
n := 0
|
||||
for _, buf := range c.bufs {
|
||||
n += copy(p[n:], buf)
|
||||
if n == len(p) {
|
||||
if len(buf)+n >= len(p) {
|
||||
n += copy(p[n:], buf[:len(p)-n])
|
||||
return n, nil
|
||||
} else {
|
||||
n += copy(p[n:], buf)
|
||||
}
|
||||
}
|
||||
nn, err := io.ReadFull(c.reader, p[n:])
|
||||
if nn > 0 {
|
||||
buf := make([]byte, nn)
|
||||
copy(buf, p[n:])
|
||||
w, err := io.ReadAtLeast(c.reader, p[n:], 1)
|
||||
if w > 0 {
|
||||
buf := make([]byte, w)
|
||||
copy(buf, p[n:n+w])
|
||||
c.bufs = append(c.bufs, buf)
|
||||
n += nn
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = io.EOF
|
||||
}
|
||||
n += w
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
@ -489,9 +422,6 @@ func (r *RangeReadReadAtSeeker) getReaderAtOffset(off int64) (io.Reader, error)
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
if off < 0 || off >= r.ss.GetSize() {
|
||||
return 0, io.EOF
|
||||
}
|
||||
if off == 0 && r.headCache != nil {
|
||||
return r.headCache.head(p)
|
||||
}
|
||||
@ -500,15 +430,12 @@ func (r *RangeReadReadAtSeeker) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err = io.ReadFull(rr, p)
|
||||
if n > 0 {
|
||||
off += int64(n)
|
||||
switch err {
|
||||
case nil:
|
||||
r.readerMap.Store(int64(off), rr)
|
||||
case io.ErrUnexpectedEOF:
|
||||
err = io.EOF
|
||||
}
|
||||
n, err = io.ReadAtLeast(rr, p, 1)
|
||||
off += int64(n)
|
||||
if err == nil {
|
||||
r.readerMap.Store(int64(off), rr)
|
||||
} else {
|
||||
rr = nil
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
@ -517,14 +444,20 @@ func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
case io.SeekCurrent:
|
||||
if offset == 0 {
|
||||
return r.masterOff, nil
|
||||
}
|
||||
offset += r.masterOff
|
||||
case io.SeekEnd:
|
||||
offset += r.ss.GetSize()
|
||||
default:
|
||||
return 0, errors.New("Seek: invalid whence")
|
||||
return 0, errs.NotSupport
|
||||
}
|
||||
if offset < 0 || offset > r.ss.GetSize() {
|
||||
return 0, errors.New("Seek: invalid offset")
|
||||
if offset < 0 {
|
||||
return r.masterOff, errors.New("invalid seek: negative position")
|
||||
}
|
||||
if offset > r.ss.GetSize() {
|
||||
offset = r.ss.GetSize()
|
||||
}
|
||||
r.masterOff = offset
|
||||
return offset, nil
|
||||
@ -532,8 +465,6 @@ func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
|
||||
func (r *RangeReadReadAtSeeker) Read(p []byte) (n int, err error) {
|
||||
n, err = r.ReadAt(p, r.masterOff)
|
||||
if n > 0 {
|
||||
r.masterOff += int64(n)
|
||||
}
|
||||
r.masterOff += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
@ -1,88 +0,0 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
)
|
||||
|
||||
func TestFileStream_RangeRead(t *testing.T) {
|
||||
conf.MaxBufferLimit = 16 * 1024 * 1024
|
||||
type args struct {
|
||||
httpRange http_range.Range
|
||||
}
|
||||
buf := []byte("github.com/OpenListTeam/OpenList")
|
||||
f := &FileStream{
|
||||
Obj: &model.Object{
|
||||
Size: int64(len(buf)),
|
||||
},
|
||||
Reader: io.NopCloser(bytes.NewReader(buf)),
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
f *FileStream
|
||||
args args
|
||||
want func(f *FileStream, got io.Reader, err error) error
|
||||
}{
|
||||
{
|
||||
name: "range 11-12",
|
||||
f: f,
|
||||
args: args{
|
||||
httpRange: http_range.Range{Start: 11, Length: 12},
|
||||
},
|
||||
want: func(f *FileStream, got io.Reader, err error) error {
|
||||
if f.GetFile() != nil {
|
||||
return errors.New("cached")
|
||||
}
|
||||
b, _ := io.ReadAll(got)
|
||||
if !bytes.Equal(buf[11:11+12], b) {
|
||||
return fmt.Errorf("=%s ,want =%s", b, buf[11:11+12])
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "range 11-21",
|
||||
f: f,
|
||||
args: args{
|
||||
httpRange: http_range.Range{Start: 11, Length: 21},
|
||||
},
|
||||
want: func(f *FileStream, got io.Reader, err error) error {
|
||||
if f.GetFile() == nil {
|
||||
return errors.New("not cached")
|
||||
}
|
||||
b, _ := io.ReadAll(got)
|
||||
if !bytes.Equal(buf[11:11+21], b) {
|
||||
return fmt.Errorf("=%s ,want =%s", b, buf[11:11+21])
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := tt.f.RangeRead(tt.args.httpRange)
|
||||
if err := tt.want(tt.f, got, err); err != nil {
|
||||
t.Errorf("FileStream.RangeRead() %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
t.Run("after", func(t *testing.T) {
|
||||
if f.GetFile() == nil {
|
||||
t.Error("not cached")
|
||||
}
|
||||
buf2 := make([]byte, len(buf))
|
||||
if _, err := io.ReadFull(f, buf2); err != nil {
|
||||
t.Errorf("FileStream.Read() error = %v", err)
|
||||
}
|
||||
if !bytes.Equal(buf, buf2) {
|
||||
t.Errorf("FileStream.Read() = %s, want %s", buf2, buf)
|
||||
}
|
||||
})
|
||||
}
|
@ -8,14 +8,13 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/net"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/pool"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/rclone/rclone/lib/mmap"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -142,61 +141,81 @@ func (r *ReaderWithCtx) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashType *utils.HashType, hashParams ...any) (model.File, string, error) {
|
||||
func CacheFullInTempFileAndWriter(stream model.FileStreamer, up model.UpdateProgress, w io.Writer) (model.File, error) {
|
||||
if cache := stream.GetFile(); cache != nil {
|
||||
if w != nil {
|
||||
_, err := cache.Seek(0, io.SeekStart)
|
||||
if err == nil {
|
||||
var reader io.Reader = stream
|
||||
if up != nil {
|
||||
reader = &ReaderUpdatingProgress{
|
||||
Reader: stream,
|
||||
UpdateProgress: up,
|
||||
}
|
||||
}
|
||||
_, err = utils.CopyWithBuffer(w, reader)
|
||||
if err == nil {
|
||||
_, err = cache.Seek(0, io.SeekStart)
|
||||
}
|
||||
}
|
||||
return cache, err
|
||||
}
|
||||
if up != nil {
|
||||
up(100)
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
var reader io.Reader = stream
|
||||
if up != nil {
|
||||
reader = &ReaderUpdatingProgress{
|
||||
Reader: stream,
|
||||
UpdateProgress: up,
|
||||
}
|
||||
}
|
||||
|
||||
if w != nil {
|
||||
reader = io.TeeReader(reader, w)
|
||||
}
|
||||
tmpF, err := utils.CreateTempFile(reader, stream.GetSize())
|
||||
if err == nil {
|
||||
stream.SetTmpFile(tmpF)
|
||||
}
|
||||
return tmpF, err
|
||||
}
|
||||
|
||||
func CacheFullInTempFileAndHash(stream model.FileStreamer, up model.UpdateProgress, hashType *utils.HashType, hashParams ...any) (model.File, string, error) {
|
||||
h := hashType.NewFunc(hashParams...)
|
||||
tmpF, err := stream.CacheFullAndWriter(up, h)
|
||||
tmpF, err := CacheFullInTempFileAndWriter(stream, up, h)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
return tmpF, hex.EncodeToString(h.Sum(nil)), nil
|
||||
return tmpF, hex.EncodeToString(h.Sum(nil)), err
|
||||
}
|
||||
|
||||
type StreamSectionReader struct {
|
||||
file model.FileStreamer
|
||||
off int64
|
||||
bufPool *pool.Pool[[]byte]
|
||||
bufPool *sync.Pool
|
||||
}
|
||||
|
||||
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (*StreamSectionReader, error) {
|
||||
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int) (*StreamSectionReader, error) {
|
||||
ss := &StreamSectionReader{file: file}
|
||||
if file.GetFile() != nil {
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
maxBufferSize = min(maxBufferSize, int(file.GetSize()))
|
||||
if maxBufferSize > conf.MaxBufferLimit {
|
||||
_, err := file.CacheFullAndWriter(up, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ss, nil
|
||||
}
|
||||
if conf.MmapThreshold > 0 && maxBufferSize >= conf.MmapThreshold {
|
||||
ss.bufPool = &pool.Pool[[]byte]{
|
||||
New: func() []byte {
|
||||
buf, err := mmap.Alloc(maxBufferSize)
|
||||
if err == nil {
|
||||
file.Add(utils.CloseFunc(func() error {
|
||||
return mmap.Free(buf)
|
||||
}))
|
||||
} else {
|
||||
buf = make([]byte, maxBufferSize)
|
||||
}
|
||||
return buf
|
||||
},
|
||||
}
|
||||
} else {
|
||||
ss.bufPool = &pool.Pool[[]byte]{
|
||||
New: func() []byte {
|
||||
return make([]byte, maxBufferSize)
|
||||
},
|
||||
if file.GetFile() == nil {
|
||||
maxBufferSize = min(maxBufferSize, int(file.GetSize()))
|
||||
if maxBufferSize > conf.MaxBufferLimit {
|
||||
_, err := file.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
ss.bufPool = &sync.Pool{
|
||||
New: func() any {
|
||||
return make([]byte, maxBufferSize)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
file.Add(utils.CloseFunc(func() error {
|
||||
ss.bufPool.Reset()
|
||||
return nil
|
||||
}))
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
@ -208,7 +227,7 @@ func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionRead
|
||||
if off != ss.off {
|
||||
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
|
||||
}
|
||||
tempBuf := ss.bufPool.Get()
|
||||
tempBuf := ss.bufPool.Get().([]byte)
|
||||
buf = tempBuf[:length]
|
||||
n, err := io.ReadFull(ss.file, buf)
|
||||
if int64(n) != length {
|
||||
@ -221,7 +240,7 @@ func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionRead
|
||||
return &SectionReader{io.NewSectionReader(cache, off, length), buf}, nil
|
||||
}
|
||||
|
||||
func (ss *StreamSectionReader) FreeSectionReader(sr *SectionReader) {
|
||||
func (ss *StreamSectionReader) RecycleSectionReader(sr *SectionReader) {
|
||||
if sr != nil {
|
||||
if sr.buf != nil {
|
||||
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user