@@ -35,10 +35,12 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
3535
3636 protected initialized : Promise < void > ;
3737
38- protected readConnections : OPSQLiteConnection [ ] | null ;
38+ protected readConnections : Array < { busy : boolean ; connection : OPSQLiteConnection } > | null ;
3939
4040 protected writeConnection : OPSQLiteConnection | null ;
4141
42+ private readQueue : Array < ( ) => void > = [ ] ;
43+
4244 constructor ( protected options : OPSQLiteAdapterOptions ) {
4345 super ( ) ;
4446 this . name = this . options . name ;
@@ -88,7 +90,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
8890 let dbName = './' . repeat ( i + 1 ) + dbFilename ;
8991 const conn = await this . openConnection ( dbName ) ;
9092 await conn . execute ( 'PRAGMA query_only = true' ) ;
91- this . readConnections . push ( conn ) ;
93+ this . readConnections . push ( { busy : false , connection : conn } ) ;
9294 }
9395 }
9496
@@ -145,36 +147,46 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
145147 close ( ) {
146148 this . initialized . then ( ( ) => {
147149 this . writeConnection ! . close ( ) ;
148- this . readConnections ! . forEach ( ( c ) => c . close ( ) ) ;
150+ this . readConnections ! . forEach ( ( c ) => c . connection . close ( ) ) ;
149151 } ) ;
150152 }
151153
152154 async readLock < T > ( fn : ( tx : OPSQLiteConnection ) => Promise < T > , options ?: DBLockOptions ) : Promise < T > {
153155 await this . initialized ;
154- // TODO: Use async queues to handle multiple read connections
155- const sortedConnections = this . readConnections ! . map ( ( connection , index ) => ( {
156- lockKey : `${ LockType . READ } -${ index } ` ,
157- connection
158- } ) ) . sort ( ( a , b ) => {
159- const aBusy = this . locks . isBusy ( a . lockKey ) ;
160- const bBusy = this . locks . isBusy ( b . lockKey ) ;
161- // Sort by ones which are not busy
162- return aBusy > bBusy ? 1 : 0 ;
156+ return new Promise ( async ( resolve , reject ) => {
157+ const execute = async ( ) => {
158+ // Find an available connection that is not busy
159+ const availableConnection = this . readConnections ! . find ( ( conn ) => ! conn . busy ) ;
160+
161+ // If we have an available connection, use it
162+ if ( availableConnection ) {
163+ availableConnection . busy = true ;
164+ try {
165+ resolve ( await fn ( availableConnection . connection ) ) ;
166+ } catch ( error ) {
167+ reject ( error ) ;
168+ } finally {
169+ availableConnection . busy = false ;
170+ // After query execution, process any queued tasks
171+ this . processQueue ( ) ;
172+ }
173+ } else {
174+ // If no available connections, add to the queue
175+ this . readQueue . push ( execute ) ;
176+ }
177+ } ;
178+
179+ execute ( ) ;
163180 } ) ;
181+ }
164182
165- return new Promise ( async ( resolve , reject ) => {
166- try {
167- await this . locks . acquire (
168- sortedConnections [ 0 ] . lockKey ,
169- async ( ) => {
170- resolve ( await fn ( sortedConnections [ 0 ] . connection ) ) ;
171- } ,
172- { timeout : options ?. timeoutMs }
173- ) ;
174- } catch ( ex ) {
175- reject ( ex ) ;
183+ private async processQueue ( ) : Promise < void > {
184+ if ( this . readQueue . length > 0 ) {
185+ const next = this . readQueue . shift ( ) ;
186+ if ( next ) {
187+ next ( ) ;
176188 }
177- } ) ;
189+ }
178190 }
179191
180192 async writeLock < T > ( fn : ( tx : OPSQLiteConnection ) => Promise < T > , options ?: DBLockOptions ) : Promise < T > {
0 commit comments